在 Overview 里我们初步介绍了 DAG 型的物理执行图,里面包含 stages 和 tasks。这一章主要解决的问题是:
给定 job 的逻辑执行图,如何生成物理执行图(也就是 stages 和 tasks)?
代码贴在本章最后。给定这样一个复杂数据依赖图,如何合理划分 stage,并确定 task 的类型和个数?
一个直观想法是将前后关联的 RDDs 组成一个 stage,每个箭头生成一个 task。对于两个 RDD 聚合成一个 RDD 的情况,这三个 RDD 组成一个 stage。这样虽然可以解决问题,但显然效率不高。除了效率问题,这个想法还有一个更严重的问题:大量中间数据需要存储。对于 task 来说,其执行结果要么要存到磁盘,要么存到内存,或者两者皆有。如果每个箭头都是 task 的话,每个 RDD 里面的数据都需要存起来,占用空间可想而知。
仔细观察一下逻辑执行图会发现:在每个 RDD 中,每个 partition 是独立的,也就是说在 RDD 内部,每个 partition 的数据依赖各自不会相互干扰。因此,一个大胆的想法是将整个流程图看成一个 stage,为最后一个 finalRDD 中的每个 partition 分配一个 task。图示如下:
所有的粗箭头组合成第一个 task,该 task 计算结束后顺便将 CoGroupedRDD 中已经计算得到的第二个和第三个 partition 存起来。之后第二个 task(细实线)只需计算两步,第三个 task(细虚线)也只需要计算两步,最后得到结果。
这个想法有两个不靠谱的地方:
- 第一个 task 太大,碰到 ShuffleDependency 后,不得不计算 shuffle 依赖的 RDDs 的所有 partitions,而且都在这一个 task 里面计算。
- 需要设计巧妙的算法来判断哪个 RDD 中的哪些 partition 需要 cache。而且 cache 会占用存储空间。
虽然这是个不靠谱的想法,但有一个可取之处,即 pipeline 思想:数据用的时候再算,而且数据是流到要计算的位置的。比如在第一个 task 中,从 FlatMappedValuesRDD 中的 partition 向前推算,只计算要用的(依赖的) RDDs 及 partitions。在第二个 task 中,从 CoGroupedRDD 到 FlatMappedValuesRDD 计算过程中,不需要存储中间结果(MappedValuesRDD 中 partition 的全部数据)。
更进一步,从 record 粒度来讲,如下图中,第一个 pattern 中先算 g(f(record1)),然后原始的 record1 和 f(record1) 都可以丢掉,然后再算 g(f(record2)),丢掉中间结果,最后算 g(f(record3))。对于第二个 pattern 中的 g,record1 进入 g 后,理论上可以丢掉(除非被手动 cache)。其他 pattern 同理。
回到 stage 和 task 的划分问题,上面不靠谱想法的主要问题是碰到 ShuffleDependency 后无法进行 pipeline。那么只要在 ShuffleDependency 处断开,就只剩 NarrowDependency,而 NarrowDependency chain 是可以进行 pipeline 的。按照此思想,上面 ComplexJob 的划分图如下:
所以划分算法就是:从后往前推算,遇到 ShuffleDependency 就断开,遇到 NarrowDependency 就将其加入该 stage。每个 stage 里面 task 的数目由该 stage 最后一个 RDD 中的 partition 个数决定。
粗箭头表示 task。因为是从后往前推算,因此最后一个 stage 的 id 是 0,stage 1 和 stage 2 都是 stage 0 的 parents。**如果 stage 最后要产生 result,那么该 stage 里面的 task 都是 ResultTask,否则都是 ShuffleMapTask。**之所以称为 ShuffleMapTask 是因为其计算结果需要 shuffle 到下一个 stage,本质上相当于 MapReduce 中的 mapper。ResultTask 相当于 MapReduce 中的 reducer(如果需要从 parent stage 那里 shuffle 数据),也相当于普通 mapper(如果该 stage 没有 parent stage)。
还有一个问题:算法中提到 NarrowDependency chain 可以 pipeline,可是这里的 ComplexJob 只展示了 OneToOneDependency 和 RangeDependency 的 pipeline,普通 NarrowDependency 如何 pipeline?
回想上一章里面 cartesian(otherRDD) 里面复杂的 NarrowDependency,图示如下:
经过算法划分后结果如下:
图中粗箭头展示了第一个 ResultTask,其他的 task 依此类推。由于该 stage 的 task 直接输出 result,所以这个图包含 6 个 ResultTasks。与 OneToOneDependency 不同的是这里每个 ResultTask 需要计算 3 个 RDD,读取两个 data block,而整个读取和计算这三个 RDD 的过程在一个 task 里面完成。当计算 CartesianRDD 中的 partition 时,需要从两个 RDD 获取 records,由于都在一个 task 里面,不需要 shuffle。这个图说明:不管是 1:1 还是 N:1 的 NarrowDependency,只要是 NarrowDependency chain,就可以进行 pipeline,生成的 task 个数与该 stage 最后一个 RDD 的 partition 个数相同。
生成了 stage 和 task 以后,下一个问题就是 task 如何执行来生成最后的 result?
回到 ComplexJob 的物理执行图,如果按照 MapReduce 的逻辑,从前到后执行,map() 产生中间数据 map outpus,经过 partition 后放到本地磁盘。再经过 shuffle-sort-aggregate 后生成 reduce inputs,最后 reduce() 执行得到 result。执行流程如下:
整个执行流程没有问题,但不能直接套用在 Spark 的物理执行图上,因为 MapReduce 的流程图简单、固定,而且没有 pipeline。
回想 pipeline 的思想是 数据用的时候再算,而且数据是流到要计算的位置的。Result 产生的地方的就是要计算的位置,要确定 “需要计算的数据”,我们可以从后往前推,需要哪个 partition 就计算哪个 partition,如果 partition 里面没有数据,就继续向前推,形成 computing chain。这样推下去,结果就是:需要首先计算出每个 stage 最左边的 RDD 中的某些 partition。
**对于没有 parent stage 的 stage,该 stage 最左边的 RDD 是可以立即计算的,而且每计算出一个 record 后便可以流入 f 或 g(见前面图中的 patterns)。**如果 f 中的 record 关系是 1:1 的,那么 f(record1) 计算结果可以立即顺着 computing chain 流入 g 中。如果 f 的 record 关系是 N:1,record1 进入 f() 后也可以被回收。总结一下,computing chain 从后到前建立,而实际计算出的数据从前到后流动,而且计算出的第一个 record 流动到不能再流动后,再计算下一个 record。这样,虽然是要计算后续 RDD 的 partition 中的 records,但并不是要求当前 RDD 的 partition 中所有 records 计算得到后再整体向后流动。
对于有 parent stage 的 stage,先等着所有 parent stages 中 final RDD 中数据计算好,然后经过 shuffle 后,问题就又回到了计算 “没有 parent stage 的 stage”。
代码实现:每个 RDD 包含的 getDependency() 负责确立 RDD 的数据依赖,compute() 方法负责接收 parent RDDs 或者 data block 流入的 records,进行计算,然后输出 record。经常可以在 RDD 中看到这样的代码
firstParent[T].iterator(split, context).map(f)
。firstParent 表示该 RDD 依赖的第一个 parent RDD,iterator() 表示 parentRDD 中的 records 是一个一个流入该 RDD 的,map(f) 表示每流入一个 recod 就对其进行 f(record) 操作,输出 record。为了统一接口,这段 compute() 仍然返回一个 iterator,来迭代 map(f) 输出的 records。
总结一下:整个 computing chain 根据数据依赖关系自后向前建立,遇到 ShuffleDependency 后形成 stage。在每个 stage 中,每个 RDD 中的 compute() 调用 parentRDD.iter() 来将 parent RDDs 中的 records 一个个 fetch 过来。
如果要自己设计一个 RDD,那么需要注意的是 compute() 只负责定义 parent RDDs => output records 的计算逻辑,具体依赖哪些 parent RDDs 由 getDependency()
定义,具体依赖 parent RDD 中的哪些 partitions 由 dependency.getParents()
定义。
例如,在 CartesianRDD 中,
// RDD x = (RDD a).cartesian(RDD b)
// 定义 RDD x 应该包含多少个 partition,每个 partition 是什么类型
override def getPartitions: Array[Partition] = {
// create the cross product split
val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
// 定义 RDD x 中的每个 partition 怎么计算得到
override def compute(split: Partition, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianPartition]
// s1 表示 RDD x 中的 partition 依赖 RDD a 中的 partitions(这里只依赖一个)
// s2 表示 RDD x 中的 partition 依赖 RDD b 中的 partitions(这里只依赖一个)
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
// 定义 RDD x 中的 partition i 依赖于哪些 RDD 中的哪些 partitions
//
// 这里 RDD x 依赖于 RDD a,同时依赖于 RDD b,都是 NarrowDependency
// 对于第一个依赖,RDD x 中的 partition i 依赖于 RDD a 中的
// 第 List(i / numPartitionsInRdd2) 个 partition
// 对于第二个依赖,RDD x 中的 partition i 依赖于 RDD b 中的
// 第 List(id % numPartitionsInRdd2) 个 partition
override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(rdd1) {
def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
},
new NarrowDependency(rdd2) {
def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
}
)
前面介绍了逻辑和物理执行图的生成原理,那么,怎么触发 job 的生成?已经介绍了 task,那么 job 是什么?
下表列出了可以触发执行图生成的典型 action(),其中第二列是 processPartition()
,定义如何计算 partition 中的 records 得到 result。第三列是 resultHandler()
,定义如何对从各个 partition 收集来的 results 进行计算来得到最终结果。
Action | finalRDD(records) => result | compute(results) |
---|---|---|
reduce(func) | (record1, record2) => result, (result, record i) => result | (result1, result 2) => result, (result, result i) => result |
collect() | Array[records] => result | Array[result] |
count() | count(records) => result | sum(result) |
foreach(f) | f(records) => result | Array[result] |
take(n) | record (i<=n) => result | Array[result] |
first() | record 1 => result | Array[result] |
takeSample() | selected records => result | Array[result] |
takeOrdered(n, [ordering]) | TopN(records) => result | TopN(results) |
saveAsHadoopFile(path) | records => write(records) | null |
countByKey() | (K, V) => Map(K, count(K)) | (Map, Map) => Map(K, count(K)) |
用户的 driver 程序中一旦出现 action(),就会生成一个 job,比如 foreach()
会调用sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
,向 DAGScheduler 提交 job。如果 driver 程序后面还有 action(),那么其他 action() 也会生成 job 提交。所以,driver 有多少个 action(),就会生成多少个 job。这就是 Spark 称 driver 程序为 application(可能包含多个 job)而不是 job 的原因。
每一个 job 包含 n 个 stage,最后一个 stage 产生 result。比如,第一章的 GroupByTest 例子中存在两个 job,一共产生了两组 result。在提交 job 过程中,DAGScheduler 会首先划分 stage,然后先提交无 parent stage 的 stages,并在提交过程中确定该 stage 的 task 个数及类型,并提交具体的 task。无 parent stage 的 stage 提交完后,依赖该 stage 的 stage 才能够提交。从 stage 和 task 的执行角度来讲,一个 stage 的 parent stages 执行完后,该 stage 才能执行。
下面简单分析下 job 的生成和提交代码,提交过程在 Architecture 那一章也会有图文并茂的分析:
- rdd.action() 会调用
DAGScheduler.runJob(rdd, processPartition, resultHandler)
来生成 job。 - runJob() 会首先通过
rdd.getPartitions()
来得到 finalRDD 中应该存在的 partition 的个数和类型:Array[Partition]。然后根据 partition 个数 new 出来将来要持有 result 的数组Array[Result](partitions.size)
。 - 最后调用 DAGScheduler 的
runJob(rdd, cleanedFunc, partitions, allowLocal, resultHandler)
来提交 job。cleanedFunc 是 processParittion 经过闭包清理后的结果,这样可以被序列化后传递给不同节点的 task。 - DAGScheduler 的 runJob 继续调用
submitJob(rdd, func, partitions, allowLocal, resultHandler)
来提交 job。 - submitJob() 首先得到一个 jobId,然后再次包装 func,向 DAGSchedulerEventProcessActor 发送 JobSubmitted 信息,该 actor 收到信息后进一步调用
dagScheduler.handleJobSubmitted()
来处理提交的 job。之所以这么麻烦,是为了符合事件驱动模型。 - handleJobSubmmitted() 首先调用 finalStage = newStage() 来划分 stage,然后submitStage(finalStage)。由于 finalStage 可能有 parent stages,实际先提交 parent stages,等到他们执行完,finalStage 需要再次提交执行。再次提交由 handleJobSubmmitted() 最后的 submitWaitingStages() 负责。
分析一下 newStage() 如何划分 stage:
- 该方法在 new Stage() 的时候会调用 finalRDD 的 getParentStages()。
- getParentStages() 从 finalRDD 出发,反向 visit 逻辑执行图,遇到 NarrowDependency 就将依赖的 RDD 加入到 stage,遇到 ShuffleDependency 切开 stage,并递归到 ShuffleDepedency 依赖的 stage。
- 一个 ShuffleMapStage(不是最后形成 result 的 stage)形成后,会将该 stage 最后一个 RDD 注册到
MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
,这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置。
分析一下 submitStage(stage) 如何提交 stage 和 task:
- 先确定该 stage 的 missingParentStages,使用
getMissingParentStages(stage)
。如果 parentStages 都可能已经执行过了,那么就为空了。 - 如果 missingParentStages 不为空,那么先递归提交 missing 的 parent stages,并将自己加入到 waitingStages 里面,等到 parent stages 执行结束后,会触发提交 waitingStages 里面的 stage。
- 如果 missingParentStages 为空,说明该 stage 可以立即执行,那么就调用
submitMissingTasks(stage, jobId)
来生成和提交具体的 task。如果 stage 是 ShuffleMapStage,那么 new 出来与该 stage 最后一个 RDD 的 partition 数相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出来与 stage 最后一个 RDD 的 partition 个数相同的 ResultTasks。一个 stage 里面的 task 组成一个 TaskSet,最后调用taskScheduler.submitTasks(taskSet)
来提交一整个 taskSet。 - 这个 taskScheduler 类型是 TaskSchedulerImpl,在 submitTasks() 里面,每一个 taskSet 被包装成 manager: TaskSetMananger,然后交给
schedulableBuilder.addTaskSetManager(manager)
。schedulableBuilder 可以是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 调度器。submitTasks() 最后一步是通知backend.reviveOffers()
去执行 task,backend 的类型是 SchedulerBackend。如果在集群上运行,那么这个 backend 类型是 SparkDeploySchedulerBackend。 - SparkDeploySchedulerBackend 是 CoarseGrainedSchedulerBackend 的子类,
backend.reviveOffers()
其实是向 DriverActor 发送 ReviveOffers 信息。SparkDeploySchedulerBackend 在 start() 的时候,会启动 DriverActor。DriverActor 收到 ReviveOffers 消息后,会调用launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
来 launch tasks。scheduler 就是 TaskSchedulerImpl。scheduler.resourceOffers()
从 FIFO 或者 Fair 调度器那里获得排序后的 TaskSetManager,并经过TaskSchedulerImpl.resourceOffer()
,考虑 locality 等因素来确定 task 的全部信息 TaskDescription。调度细节这里暂不讨论。 - DriverActor 中的 launchTasks() 将每个 task 序列化,如果序列化大小不超过 Akka 的 akkaFrameSize,那么直接将 task 送到 executor 那里执行
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
。
至此,我们讨论了:
- driver 程序如何触发 job 的生成
- 如何从逻辑执行图得到物理执行图
- pipeline 思想与实现
- 生成与提交 job 的实际代码
还有很多地方没有深入讨论,如:
- 连接 stage 的 shuffle 过程
- task 运行过程及运行位置
下一章重点讨论 shuffle 过程。
从逻辑执行图的建立,到将其转换成物理执行图的过程很经典,过程中的 dependency 划分,pipeline,stage 分割,task 生成 都是有条不紊,有理有据的。
package internals
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner
object complexJob {
def main(args: Array[String]) {
val sc = new SparkContext("local", "ComplexJob test")
val data1 = Array[(Int, Char)](
(1, 'a'), (2, 'b'),
(3, 'c'), (4, 'd'),
(5, 'e'), (3, 'f'),
(2, 'g'), (1, 'h'))
val rangePairs1 = sc.parallelize(data1, 3)
val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))
val data2 = Array[(Int, String)]((1, "A"), (2, "B"),
(3, "C"), (4, "D"))
val pairs2 = sc.parallelize(data2, 2)
val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))
val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
val rangePairs3 = sc.parallelize(data3, 2)
val rangePairs = rangePairs2.union(rangePairs3)
val result = hashPairs1.join(rangePairs)
result.foreachWith(i => i)((x, i) => println("[result " + i + "] " + x))
println(result.toDebugString)
}
}