Skip to content

Commit

Permalink
Update ballista-mvp part2 blog
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Apr 22, 2024
1 parent 9d7a667 commit bbb39b7
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions content/blog/2024-04-17-ballista-mvp-part2/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ date = 2024-04-19
draft = false
+++

## 生成分布式执行计划
**原始 SQL**
```sql
SELECT customer.c_custkey, sum(orders.o_totalprice) as total_amount
Expand All @@ -30,6 +31,7 @@ ProjectionExec: expr=[c_custkey@0 as c_custkey, SUM(orders.o_totalprice)@1 as to
1. Datafusion 提供了 target_partitions 配置项(默认为本机的 CPU 核心数量)来配置并行度,在生成单机执行计划时会插入 RepartitionExec 算子来调整 partition 数量。由于表数据分布在 3 个 parquet 文件,
ParquetExec 读取后输出 3 个 partition,因此在这里插入 RepartitionExec 算子将 partition 数量从 3 个提高到 16 个。(在分布式环境下为了利用多个机器,支持更高的并行度,Ballista 提供了可以给每个 session 手动配置此项的支持)
2. Datafusion 提供了 repartition_joins 开关项,基于 join key 进行 hash repartition 后可并行执行 hash join。
3. 这里其实还有优化空间,两个连续的 RepartitionExec 算子可以合并成一个(见 [issue-9370](https://github.com/apache/datafusion/issues/9370))。


**生成初步的分布式执行计划**
Expand All @@ -52,6 +54,8 @@ ShuffleWriterExec: None
UnresolvedShuffleExec
```
1. ResolvedStage 代表当前 stage 可以立即执行,UnResolvedStage 代表当前 stage 依赖的前置 stage 还没执行完毕。
2. join 的两个子树因为执行了 repartition 操作,因此会生成两个 stage:stage1 和 stage2。
3. 最终会在树的 root 那里再生成一个 stage:stage3,它依赖 stage1 和 stage2。

在 stage1 和 stage2 执行完毕后,stage3 会更新成如下
```
Expand All @@ -70,9 +74,11 @@ UnsolvedShuffleExec 会被 ShuffleReaderExec 算子替代。

为什么会生成这样的分布式执行计划?
1. Ballista 会在执行 repartition 的算子(如 RepartitionExec/CoalescePartitionsExec/SortPreservingMergeExec 算子,也被称为 pipeline breaker)那里插入 shuffle 算子,将单机执行计划分割成多个 stage,每个 stage 内部所有算子均为相同的分区方案。
2. 每个 stage 最终都会通过 ShuffleWriterExec 算子将执行结果 repartition 并写入本地磁盘。
2. 每个 stage 最终都会通过 ShuffleWriterExec 算子对执行结果 repartition (如有需要)并写入本地磁盘。
3. 每个有前置依赖的 stage 都会从 ShuffleReaderExec 算子开始执行,ShuffleReaderExec 算子负责读取前置 stage 产生的中间执行结果。

## Shuffle 算子

**ShuffleWriterExec 算子**
```rust
pub struct ShuffleWriterExec {
Expand All @@ -92,7 +98,7 @@ pub struct ShuffleWriterExec {
```

1. work_dir 在生成分布式执行计划时为空,等到实际执行时,会被替换为 executor 的 work_dir。
2. 最终每个 partition 数据以 Arrow IPC 格式存储
2. 最终每个 stage 输出的每个 partition 数据以 Arrow IPC 格式存储
- 当不做 repartition 时,数据存储在 `<work_dir>/<job_id>/<stage_id>/<partition>/data.arrow`
- 当需要 repartition 时,数据存储在 `<work_dir>/<job_id>/<stage_id>/<output_partition>/data-<input_partition>.arrow`

Expand Down Expand Up @@ -121,5 +127,3 @@ pub struct UnresolvedShuffleExec {
}
```
主要起到占位符作用,等前置 stage 执行完毕后,UnresolvedShuffleExec 算子会被实际的 ShuffleReaderExec 算子替换。


0 comments on commit bbb39b7

Please sign in to comment.