Skip to content

Commit

Permalink
Qualification tool: Parsing Execs to get the ExecInfo #2 (#5426)
Browse files Browse the repository at this point in the history
* Qualification tool: Parsing Execs to get the ExecInfo #2

Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored May 5, 2022
1 parent 0004be3 commit c1aebfc
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class CartesianProductExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// CartesianProduct doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class GenerateExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// Generate doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class GlobalLimitExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// GlobalLimit doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class LocalLimitExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// LocalLimit doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ object SQLPlanParser extends Logging {
app: AppBase
): Seq[ExecInfo] = {
node match {
case c if (c.name == "CartesianProduct") =>
CartesianProductExecParser(c, checker, sqlID).parse
case c if (c.name == "Coalesce") =>
CoalesceExecParser(c, checker, sqlID).parse
case c if (c.name == "CollectLimit") =>
Expand All @@ -88,12 +90,20 @@ object SQLPlanParser extends Logging {
ExpandExecParser(e, checker, sqlID).parse
case f if (f.name == "Filter") =>
FilterExecParser(f, checker, sqlID).parse
case g if (g.name == "Generate") =>
GenerateExecParser(g, checker, sqlID).parse
case g if (g.name == "GlobalLimit") =>
GlobalLimitExecParser(g, checker, sqlID).parse
case l if (l.name == "LocalLimit") =>
LocalLimitExecParser(l, checker, sqlID).parse
case p if (p.name == "Project") =>
ProjectExecParser(p, checker, sqlID).parse
case r if (r.name == "Range") =>
RangeExecParser(r, checker, sqlID).parse
case s if (s.name == "Sample") =>
SampleExecParser(s, checker, sqlID).parse
case s if (s.name == "Sort") =>
SortExecParser(s, checker, sqlID).parse
case t if (t.name == "TakeOrderedAndProject") =>
TakeOrderedAndProjectExecParser(t, checker, sqlID).parse
case u if (u.name == "Union") =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class SortExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// Sort doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.{BeforeAndAfterEach, FunSuite}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, TrampolineUtil}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.{col, explode}
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo

class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging {
Expand All @@ -39,7 +39,9 @@ class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging {
.getOrCreate()
}

test("WholeStage with Filter and Project") {
private val logDir = ToolTestUtils.getTestResourcePath("spark-events-qualification")

test("WholeStage with Filter, Project and Sort") {
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, "sqlmetric") { spark =>
import spark.implicits._
Expand All @@ -48,6 +50,7 @@ class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging {
df.select( $"value" as "a")
.join(df2.select($"value" as "b"), $"a" === $"b")
.filter($"b" < 100)
.sort($"b")
}

TrampolineUtil.withTempDir { outpath =>
Expand All @@ -63,25 +66,33 @@ class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(app.sqlPlans.size == 1)
app.sqlPlans.foreach { case(sqlID, plan) =>
val planInfo = SQLPlanParser.parseSQLPlan(plan, sqlID, pluginTypeChecker, app)
assert(planInfo.execInfo.size == 9)
assert(planInfo.execInfo.size == 11)
val wholeStages = planInfo.execInfo.filter(_.exec.contains("WholeStageCodegen"))
assert(wholeStages.size == 5)
// only 2 in the above example have projects and filters
assert(wholeStages.size == 6)
// only 2 in the above example have projects and filters and the other 3 have sort
val numSupported = wholeStages.filter(_.isSupported).size
assert(numSupported == 2)
assert(numSupported == 5)
assert(wholeStages.forall(_.duration.nonEmpty))
val allChildren = wholeStages.flatMap(_.children).flatten
assert(allChildren.size == 9)
val filters = allChildren.filter(_.exec == "FilterExec")
assert(allChildren.size == 10)
val filters = allChildren.filter(_.exec == "Filter")
assert(filters.size == 2)
assert(filters.forall(_.speedupFactor == 2))
assert(filters.forall(_.isSupported == true))
assert(filters.forall(_.children.isEmpty))
assert(filters.forall(_.duration.isEmpty))
val projects = allChildren.filter(_.exec == "ProjectExec")
val projects = allChildren.filter(_.exec == "Project")
assert(projects.size == 2)
assert(projects.forall(_.speedupFactor == 2))
assert(projects.forall(_.isSupported == true))
assert(projects.forall(_.children.isEmpty))
assert(projects.forall(_.duration.isEmpty))
val sorts = allChildren.filter(_.exec == "Sort")
assert(sorts.size == 3)
assert(sorts.forall(_.speedupFactor == 2))
assert(sorts.forall(_.isSupported == true))
assert(sorts.forall(_.children.isEmpty))
assert(sorts.forall(_.duration.isEmpty))
}
}
}
Expand Down Expand Up @@ -136,4 +147,79 @@ class SQLPlanParserSuite extends FunSuite with BeforeAndAfterEach with Logging {
}
}
}

test("Parse Execs - CartesianProduct and Generate") {
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, "sqlmetric") { spark =>
import spark.implicits._
val genDf = spark.sparkContext.parallelize(List(List(1, 2, 3), List(4, 5, 6)), 4).toDF
val joinDf1 = spark.sparkContext.makeRDD(1 to 10, 4).toDF
val joinDf2 = spark.sparkContext.makeRDD(1 to 10, 4).toDF
genDf.select(explode($"value")).collect
joinDf1.crossJoin(joinDf2)
}
TrampolineUtil.withTempDir { outpath =>
val hadoopConf = new Configuration()
val (_, allEventLogs) = EventLogPathProcessor.processAllPaths(
None, None, List(eventLog), hadoopConf)
val pluginTypeChecker = new PluginTypeChecker()
assert(allEventLogs.size == 1)
val appOption = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf,
pluginTypeChecker, 20)
assert(appOption.nonEmpty)
val app = appOption.get
assert(app.sqlPlans.size == 2)
val supportedExecs = Array("CartesianProduct", "Generate")
app.sqlPlans.foreach { case (sqlID, plan) =>
val planInfo = SQLPlanParser.parseSQLPlan(plan, sqlID, pluginTypeChecker, app)
for (execName <- supportedExecs) {
val supportedExec = planInfo.execInfo.filter(_.exec == execName)
if (supportedExec.nonEmpty) {
assert(supportedExec.size == 1)
assert(supportedExec.forall(_.children.isEmpty))
assert(supportedExec.forall(_.duration.isEmpty))
assert(supportedExec.forall(_.speedupFactor == 2), execName)
assert(supportedExec.forall(_.isSupported == true))
}
}
}
}
}
}

// GlobalLimit and LocalLimit is not in physical plan when collect is called on the dataframe.
// We are reading from static eventlogs to test these execs.
test("Parse execs - LocalLimit and GlobalLimit") {
val logFile = s"$logDir/global_local_limit_eventlog.zstd"
TrampolineUtil.withTempDir { outpath =>
val hadoopConf = new Configuration()
val (_, allEventLogs) = EventLogPathProcessor.processAllPaths(
None, None, List(logFile), hadoopConf)
val pluginTypeChecker = new PluginTypeChecker()
assert(allEventLogs.size == 1)
val appOption = QualificationAppInfo.createApp(allEventLogs.head, hadoopConf,
pluginTypeChecker, 20)
assert(appOption.nonEmpty)
val app = appOption.get
assert(app.sqlPlans.size == 1)
val supportedExecs = Array("GlobalLimit", "LocalLimit")
app.sqlPlans.foreach { case (sqlID, plan) =>
val planInfo = SQLPlanParser.parseSQLPlan(plan, sqlID, pluginTypeChecker, app)
// GlobalLimit and LocalLimit are inside WholeStageCodegen. So getting the children of
// WholeStageCodegenExec
val wholeStages = planInfo.execInfo.filter(_.exec.contains("WholeStageCodegen"))
val allChildren = wholeStages.flatMap(_.children).flatten
for (execName <- supportedExecs) {
val supportedExec = allChildren.filter(_.exec == execName)
if (supportedExec.nonEmpty) {
assert(supportedExec.size == 1)
assert(supportedExec.forall(_.children.isEmpty))
assert(supportedExec.forall(_.duration.isEmpty))
assert(supportedExec.forall(_.speedupFactor == 2), execName)
assert(supportedExec.forall(_.isSupported == true))
}
}
}
}
}
}

0 comments on commit c1aebfc

Please sign in to comment.