diff --git a/markdown/1-Overview.md b/markdown/1-Overview.md index 66525a3..148dd02 100644 --- a/markdown/1-Overview.md +++ b/markdown/1-Overview.md @@ -123,7 +123,7 @@ object GroupByTest { - 执行 RDD 上的 transformation 操作(这里是 flatMap)以后,生成 FlatMappedRDD,其中每个 partition 包含一个 Array[(Int, Array[Byte])]。 - 第一个 count() 执行时,先在每个 partition 上执行 count,然后执行结果被发送到 driver,最后在 driver 端进行 sum。 - 由于 FlatMappedRDD 被 cache 到内存,因此这里将里面的 partition 都换了一种颜色表示。 -- groupByKey 产生了后面三个 RDD,为什么产生这三个在后面章节讨论。 +- groupByKey 产生了后面两个 RDD,为什么产生这两个在后面章节讨论。 - 如果 job 需要 shuffle,一般会产生 ShuffledRDD。该 RDD 与前面的 RDD 的关系类似于 Hadoop 中 mapper 输出数据与 reducer 输入数据之间的关系。 - MapPartitionsRDD 里包含 groupByKey() 的结果。 - 最后将 MapPartitionsRDD 中的 每个value(也就是Array[Byte])都转换成 Iterable 类型。 @@ -173,4 +173,4 @@ Hi,文章写得很赞~关于OverView中如何配置多个Backend进程的问 Backend个数发生变化情况:1、启动一个新的Application(每个APP都会launceExecutor,此时会生成此进程)2、还可以通过设置SPARK\_WORKER\_INSTANCES参数来增加Backend个数。图可以依此稍做改动。 -Backend进程是SparkContext初始化taskcScheduler,taskcScheduler初始化SparkDeploySchedulerBackend里appDesc里的command...顺藤摸瓜即可。。CoarseGrainedExecutorBackend \ No newline at end of file +Backend进程是SparkContext初始化taskcScheduler,taskcScheduler初始化SparkDeploySchedulerBackend里appDesc里的command...顺藤摸瓜即可。。CoarseGrainedExecutorBackend diff --git a/markdown/2-JobLogicalPlan.md b/markdown/2-JobLogicalPlan.md index aec3046..f6ac053 100644 --- a/markdown/2-JobLogicalPlan.md +++ b/markdown/2-JobLogicalPlan.md @@ -138,7 +138,7 @@ val pairs = sc.parallelize(List(1, 2, 3, 4, 5), 3) ![reduceyByKey](PNGfigures/reduceByKey.png) -reduceyByKey() 相当于传统的 MapReduce,整个也数据流与 Hadoop 中的数据流基本一样。reduceyByKey() 默认在 map 端开启 combine(),因此在 shuffle 之前先通过 mapPartitions 操作进行 combine,得到 MapPartitionsRDD,然后 shuffle 得到 ShuffledRDD,然后再进行 reduce(通过 aggregate + mapPartitions() 操作来实现)得到 MapPartitionsRDD。 +reduceyByKey() 相当于传统的 MapReduce,整个数据流也与 Hadoop 中的数据流基本一样。reduceyByKey() 默认在 map 端开启 combine(),因此在 shuffle 之前先通过 mapPartitions 操作进行 combine,得到 MapPartitionsRDD,然后 shuffle 得到 ShuffledRDD,然后再进行 reduce(通过 aggregate + mapPartitions() 操作来实现)得到 MapPartitionsRDD。 **3) distinct(numPartitions)** @@ -242,4 +242,4 @@ RDD 本身的依赖关系由 transformation() 生成的每一个 RDD 本身语 RDD 中 partition 依赖关系分为 NarrowDependency 和 ShuffleDependency。前者是完全伊依赖,后者是部分依赖。NarrowDependency 里面又包含多种情况,只有前后两个 RDD 的 partition 个数以及 partitioner 都一样,才会出现 NarrowDependency。 -从数据处理逻辑的角度来看,MapReduce 相当于 Spark 中的 map() + reduceByKey(),但严格来讲 MapReduce 中的 reduce() 要比 reduceByKey() 的功能强大些,详细差别会在 Shuffle details 一章中继续讨论。 \ No newline at end of file +从数据处理逻辑的角度来看,MapReduce 相当于 Spark 中的 map() + reduceByKey(),但严格来讲 MapReduce 中的 reduce() 要比 reduceByKey() 的功能强大些,详细差别会在 Shuffle details 一章中继续讨论。 diff --git a/markdown/3-JobPhysicalPlan.md b/markdown/3-JobPhysicalPlan.md index 3518826..abe3386 100644 --- a/markdown/3-JobPhysicalPlan.md +++ b/markdown/3-JobPhysicalPlan.md @@ -18,7 +18,7 @@ 所有的粗箭头组合成第一个 task,该 task 计算结束后顺便将 CoGroupedRDD 中已经计算得到的第二个和第三个 partition 存起来。之后第二个 task(细实线)只需计算两步,第三个 task(细虚线)也只需要计算两步,最后得到结果。 -这个想法由两个不靠谱的地方: +这个想法有两个不靠谱的地方: - 第一个 task 太大,碰到 ShuffleDependency 后,不得不计算 shuffle 依赖的 RDDs 的所有 partitions,而且都在这一个 task 里面计算。 - 需要设计巧妙的算法来判断哪个 RDD 中的哪些 partition 需要 cache。而且 cache 会占用存储空间。 @@ -113,7 +113,7 @@ ## 生成 job 前面介绍了逻辑和物理执行图的生成原理,那么,**怎么触发 job 的生成?已经介绍了 task,那么 job 是什么?** -下表列出了可以触发生成执行图生成典型 [action()](http://spark.apache.org/docs/latest/programming-guide.html#actions),其中第二列是 `processPartition()`,定义如何计算 partition 中的 records 得到 result。第三列是 `resultHandler()`,定义如何对从各个 partition 收集来的 results 进行计算来得到最终结果。 +下表列出了可以触发执行图生成的典型 [action()](http://spark.apache.org/docs/latest/programming-guide.html#actions),其中第二列是 `processPartition()`,定义如何计算 partition 中的 records 得到 result。第三列是 `resultHandler()`,定义如何对从各个 partition 收集来的 results 进行计算来得到最终结果。 | Action | finalRDD(records) => result | compute(results) | @@ -122,9 +122,9 @@ | collect() |Array[records] => result | Array[result] | | count() | count(records) => result | sum(result) | | foreach(f) | f(records) => result | Array[result] | -| take(n) | record (i<=n) => result | Array{result] | -| first() | record 1 => result | Array{result] | -| takeSample() | selected records => result | Array{result] | +| take(n) | record (i<=n) => result | Array[result] | +| first() | record 1 => result | Array[result] | +| takeSample() | selected records => result | Array[result] | | takeOrdered(n, [ordering]) | TopN(records) => result | TopN(results) | | saveAsHadoopFile(path) | records => write(records) | null | | countByKey() | (K, V) => Map(K, count(K)) | (Map, Map) => Map(K, count(K)) | @@ -155,7 +155,7 @@ 1. 先确定该 stage 的 missingParentStages,使用`getMissingParentStages(stage)`。如果 parentStages 都可能已经执行过了,那么就为空了。 2. 如果 missingParentStages 不为空,那么先递归提交 missing 的 parent stages,并将自己加入到 waitingStages 里面,等到 parent stages 执行结束后,会触发提交 waitingStages 里面的 stage。 3. 如果 missingParentStages 为空,说明该 stage 可以立即执行,那么就调用`submitMissingTasks(stage, jobId)`来生成和提交具体的 task。如果 stage 是 ShuffleMapStage,那么 new 出来与该 stage 最后一个 RDD 的 partition 数相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出来与 stage 最后一个 RDD 的 partition 个数相同的 ResultTasks。一个 stage 里面的 task 组成一个 TaskSet,最后调用`taskScheduler.submitTasks(taskSet)`来提交一整个 taskSet。 -4. 这个 taskScheduler 类型是 TaskSchedulerImpl,在 submitTasks() 里面,每一个 taskSet 被包装成 manager: TaskSetMananger,然后交给`schedulableBuilder.addTaskSetManager(manager)`。schedulableBuilder 可以是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 调度器。submitTasks() 最后一步时通知`backend.reviveOffers()`去执行 task,backend 的类型是 SchedulerBackend。如果在集群上运行,那么这个 backend 类型是 SparkDeploySchedulerBackend。 +4. 这个 taskScheduler 类型是 TaskSchedulerImpl,在 submitTasks() 里面,每一个 taskSet 被包装成 manager: TaskSetMananger,然后交给`schedulableBuilder.addTaskSetManager(manager)`。schedulableBuilder 可以是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 调度器。submitTasks() 最后一步是通知`backend.reviveOffers()`去执行 task,backend 的类型是 SchedulerBackend。如果在集群上运行,那么这个 backend 类型是 SparkDeploySchedulerBackend。 5. SparkDeploySchedulerBackend 是 CoarseGrainedSchedulerBackend 的子类,`backend.reviveOffers()`其实是向 DriverActor 发送 ReviveOffers 信息。SparkDeploySchedulerBackend 在 start() 的时候,会启动 DriverActor。DriverActor 收到 ReviveOffers 消息后,会调用`launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))` 来 launch tasks。scheduler 就是 TaskSchedulerImpl。`scheduler.resourceOffers()`从 FIFO 或者 Fair 调度器那里获得排序后的 TaskSetManager,并经过`TaskSchedulerImpl.resourceOffer()`,考虑 locality 等因素来确定 task 的全部信息 TaskDescription。调度细节这里暂不讨论。 6. DriverActor 中的 launchTasks() 将每个 task 序列化,如果序列化大小不超过 Akka 的 akkaFrameSize,那么直接将 task 送到 executor 那里执行`executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))`。 @@ -221,4 +221,4 @@ object complexJob { println(result.toDebugString) } } -``` \ No newline at end of file +``` diff --git a/markdown/4-shuffleDetails.md b/markdown/4-shuffleDetails.md index 986194d..9338d58 100644 --- a/markdown/4-shuffleDetails.md +++ b/markdown/4-shuffleDetails.md @@ -182,6 +182,6 @@ ExternalAppendOnlyMap 持有一个 AppendOnlyMap,shuffle 来的一个个 (K, V ## Discussion 通过本章的介绍可以发现,相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加灵活,会根据不同的 transformation() 的语义去设计不同的 shuffle-aggregate 策略,再加上不同的内存数据结构来混搭出合理的执行流程。 -这章主要讨论了 Spark 是怎么在不排序 records 的情况下完成 shuffle write 和 shuffle write,以及怎么将 shuffle 过程融入 RDD computing chain 中的。附带讨论了内存与磁盘的平衡以及与 Hadoop MapReduce shuffle 的异同。下一章将从部署图以及进程通信角度来描述 job 执行的整个流程,也会涉及 shuffle write 和 shuffle read 中的数据位置获取问题。 +这章主要讨论了 Spark 是怎么在不排序 records 的情况下完成 shuffle write 和 shuffle read,以及怎么将 shuffle 过程融入 RDD computing chain 中的。附带讨论了内存与磁盘的平衡以及与 Hadoop MapReduce shuffle 的异同。下一章将从部署图以及进程通信角度来描述 job 执行的整个流程,也会涉及 shuffle write 和 shuffle read 中的数据位置获取问题。 另外,Jerry Shao 写的 [详细探究Spark的shuffle实现](http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/) 很赞,里面还介绍了 shuffle 过程在 Spark 中的进化史。目前 sort-based 的 shuffle 也在实现当中,stay tuned。