Skip to content

Commit

Permalink
Qualification tool: Detect RDD Api's in SQL plan (#3819)
Browse files Browse the repository at this point in the history
* detect RDD API in Sql plan

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* update readme

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* update docs

Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored Oct 18, 2021
1 parent 08820c0 commit e6a69f1
Show file tree
Hide file tree
Showing 18 changed files with 48 additions and 42 deletions.
16 changes: 8 additions & 8 deletions docs/spark-profiling-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ stage. Jobs and SQL are not color coordinated.
#### B. Analysis
- Job + Stage level aggregated task metrics
- SQL level aggregated task metrics
- SQL duration, application during, if it contains a Dataset operation, potential problems, executor CPU time percent
- SQL duration, application during, if it contains Dataset or RDD operation, potential problems, executor CPU time percent
- Shuffle Skew Check: (When task's Shuffle Read Size > 3 * Avg Stage-level size)
Below we will aggregate the task level metrics at different levels
Expand Down Expand Up @@ -382,15 +382,15 @@ SQL level aggregated task metrics:
|1 |application_1111111111111_0001|0 |show at <console>:11|1111 |222222 |6666666 |55555555 |55.55 |0 |13333333 |111111 |999 |3333.3 |6666666 |55555 |66666 |11111111 |0 |111111111111 |11111111111 |111111 |0 |0 |0 |888888888 |8 |11111 |11111 |99999 |11111111111 |2222222 |222222222222 |0 |222222222222 |444444444444 |5555555 |444444 |
```
- SQL duration, application during, if it contains a Dataset operation, potential problems, executor CPU time percent:
- SQL duration, application during, if it contains Dataset or RDD operation, potential problems, executor CPU time percent:
```
SQL Duration and Executor CPU Time Percent
+--------+------------------------------+-----+------------+-------------------+------------+------------------+-------------------------+
|appIndex|App ID |sqlID|SQL Duration|Contains Dataset Op|App Duration|Potential Problems|Executor CPU Time Percent|
+--------+------------------------------+-----+------------+-------------------+------------+------------------+-------------------------+
|1 |application_1603128018386_7759|0 |11042 |false |119990 |null |68.48 |
+--------+------------------------------+-----+------------+-------------------+------------+------------------+-------------------------+
+--------+------------------------------+-----+------------+--------------------------+------------+------------------+-------------------------+
|appIndex|App ID |sqlID|SQL Duration|Contains Dataset or RDD Op|App Duration|Potential Problems|Executor CPU Time Percent|
+--------+------------------------------+-----+------------+--------------------------+------------+------------------+-------------------------+
|1 |application_1603128018386_7759|0 |11042 |false |119990 |null |68.48 |
+--------+------------------------------+-----+------------+--------------------------+------------+------------------+-------------------------+
```
- Shuffle Skew Check:
Expand Down Expand Up @@ -520,4 +520,4 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
All the metrics definitions can be found in the
[executor task metrics doc](https://spark.apache.org/docs/latest/monitoring.html#executor-task-metrics) /
[executor metrics doc](https://spark.apache.org/docs/latest/monitoring.html#executor-metrics) or
the [SPARK webUI doc](https://spark.apache.org/docs/latest/web-ui.html#content).
the [SPARK webUI doc](https://spark.apache.org/docs/latest/web-ui.html#content).
3 changes: 3 additions & 0 deletions docs/spark-qualification-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ The score is based on the total time spent in tasks of SQL Dataframe operations.
The tool also looks for read data formats and types that the plugin doesn't fully support and if it finds any,
it will take away from the score. The parameter to control this negative impact of the
score is `-r, --read-score-percent` with the default value as 20(percent).
Each application(event log) could have multiple SQL queries. If a SQL's plan has a Dataset API or RDD call
inside of it, that SQL query is not categorized as a Dataframe SQL query. We are unable to determine how much
of that query is made up of Dataset or RDD calls so the entire query task time is not included in the score.

The idea behind this algorithm is that the longer the total task time doing SQL Dataframe operations
the higher the score is and the more likely the plugin will be able to help accelerate that application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ class Analysis(apps: Seq[ApplicationInfo]) {
app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
// Potential problems not properly track, add it later
SQLDurationExecutorTimeProfileResult(app.index, app.appId, sqlId, sqlCase.duration,
sqlCase.hasDataset, app.appInfo.duration, sqlCase.problematic,
sqlCase.hasDatasetOrRDD, app.appInfo.duration, sqlCase.problematic,
sqlCase.sqlCpuTimePercent)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class SQLExecutionInfoClass(
val startTime: Long,
var endTime: Option[Long],
var duration: Option[Long],
var hasDataset: Boolean,
var hasDatasetOrRDD: Boolean,
var problematic: String = "",
var sqlCpuTimePercent: Double = -1)

Expand Down Expand Up @@ -526,7 +526,7 @@ case class SQLDurationExecutorTimeProfileResult(appIndex: Int, appId: String, sq
duration: Option[Long], containsDataset: Boolean, appDuration: Option[Long],
potentialProbs: String, executorCpuRatio: Double) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "App ID", "sqlID", "SQL Duration",
"Contains Dataset Op", "App Duration", "Potential Problems", "Executor CPU Time Percent")
"Contains Dataset or RDD Op", "App Duration", "Potential Problems", "Executor CPU Time Percent")
val durStr = duration match {
case Some(dur) => dur.toString
case None => ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ abstract class AppBase(
logInfo(s"Total number of events parsed: $totalNumEvents for ${eventlog.toString}")
}

protected def isDataSetPlan(desc: String): Boolean = {
protected def isDataSetOrRDDPlan(desc: String): Boolean = {
desc match {
case l if l.matches(".*\\$Lambda\\$.*") => true
case a if a.endsWith(".apply") => true
case r if r.matches(".*SerializeFromObject.*") => true
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,13 @@ class ApplicationInfo(
val allnodes = planGraph.allNodes
for (node <- allnodes) {
checkGraphNodeForBatchScan(sqlID, node)
if (isDataSetPlan(node.desc)) {
if (isDataSetOrRDDPlan(node.desc)) {
sqlIdToInfo.get(sqlID).foreach { sql =>
sql.hasDataset = true
sql.hasDatasetOrRDD = true
}
if (gpuMode) {
val thisPlan = UnsupportedSQLPlan(sqlID, node.id, node.name, node.desc,
"Contains Dataset")
"Contains Dataset or RDD")
unsupportedSQLplan += thisPlan
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class EventsProcessor() extends EventProcessorBase with Logging {
event.time,
None,
None,
hasDataset = false,
hasDatasetOrRDD = false,
""
)
app.sqlIdToInfo.put(event.executionId, sqlExecution)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ class QualAppInfo(

val sqlIDtoProblematic: HashMap[Long, Set[String]] = HashMap[Long, Set[String]]()

// SQL containing any Dataset operation
val sqlIDToDataSetCase: HashSet[Long] = HashSet[Long]()
// SQL containing any Dataset operation or RDD to DataSet/DataFrame operation
val sqlIDToDataSetOrRDDCase: HashSet[Long] = HashSet[Long]()

val notSupportFormatAndTypes: HashMap[String, Set[String]] = HashMap[String, Set[String]]()

Expand Down Expand Up @@ -115,19 +115,19 @@ class QualAppInfo(
// for the SQL dataframe duration
private def calculateSqlDataframeDuration: Long = {
sqlDurationTime.filterNot { case (sqlID, dur) =>
sqlIDToDataSetCase.contains(sqlID) || dur == -1
sqlIDToDataSetOrRDDCase.contains(sqlID) || dur == -1
}.values.sum
}

private def probNotDataset: HashMap[Long, Set[String]] = {
sqlIDtoProblematic.filterNot { case (sqlID, _) => sqlIDToDataSetCase.contains(sqlID) }
sqlIDtoProblematic.filterNot { case (sqlID, _) => sqlIDToDataSetOrRDDCase.contains(sqlID) }
}

// The total task time for all tasks that ran during SQL dataframe
// operations. if the SQL contains a dataset, it isn't counted.
private def calculateTaskDataframeDuration: Long = {
val validSums = sqlIDToTaskEndSum.filterNot { case (sqlID, _) =>
sqlIDToDataSetCase.contains(sqlID) || sqlDurationTime.getOrElse(sqlID, -1) == -1
sqlIDToDataSetOrRDDCase.contains(sqlID) || sqlDurationTime.getOrElse(sqlID, -1) == -1
}
validSums.values.map(dur => dur.totalTaskDuration).sum
}
Expand Down Expand Up @@ -164,7 +164,7 @@ class QualAppInfo(

private def calculateCpuTimePercent: Double = {
val validSums = sqlIDToTaskEndSum.filterNot { case (sqlID, _) =>
sqlIDToDataSetCase.contains(sqlID) || sqlDurationTime.getOrElse(sqlID, -1) == -1
sqlIDToDataSetOrRDDCase.contains(sqlID) || sqlDurationTime.getOrElse(sqlID, -1) == -1
}
val totalCpuTime = validSums.values.map { dur =>
dur.executorCPUTime
Expand Down Expand Up @@ -252,8 +252,8 @@ class QualAppInfo(
val allnodes = planGraph.allNodes
for (node <- allnodes) {
checkGraphNodeForBatchScan(sqlID, node)
if (isDataSetPlan(node.desc)) {
sqlIDToDataSetCase += sqlID
if (isDataSetOrRDDPlan(node.desc)) {
sqlIDToDataSetOrRDDCase += sqlID
}
val issues = findPotentialIssues(node.desc)
if (issues.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
appIndex,App ID,sqlID,SQL Duration,Contains Dataset Op,App Duration,Potential Problems,Executor CPU Time Percent
1,application_1603128018386_7759,0,11042,false,119990,"",68.48
appIndex,App ID,sqlID,SQL Duration,Contains Dataset or RDD Op,App Duration,Potential Problems,Executor CPU Time Percent
1,application_1603128018386_7759,0,11042,true,119990,"",68.48
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
appIndex,sqlID,nodeID,nodeName,nodeDescription,reason
1,0,3,MapElements,MapElements com.nvidia.spark.rapids.tool.profiling.QualificationInfoSuite$$$Lambda$1571/993650587@7b,"Contains Dataset"
1,0,4,Filter,Filter com.nvidia.spark.rapids.tool.profiling.QualificationInfoSuite$$$Lambda$1569/1828787392@2eb6d3,"Contains Dataset"
1,0,2,SerializeFromObject,"SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromSt","Contains Dataset or RDD"
1,0,3,MapElements,MapElements com.nvidia.spark.rapids.tool.profiling.QualificationInfoSuite$$$Lambda$1571/993650587@7b,"Contains Dataset or RDD"
1,0,4,Filter,Filter com.nvidia.spark.rapids.tool.profiling.QualificationInfoSuite$$$Lambda$1569/1828787392@2eb6d3,"Contains Dataset or RDD"
1,0,10,SerializeFromObject,"SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromSt","Contains Dataset or RDD"
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Score,Potential Problems,SQL Dataframe Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,ReadFileFormat Score,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Unsupported Nested Complex Types
Spark shell,local-1623876083964,1417661.00,"",119903,1417661,133857,91.14,false,0,"",20,100.00,"","","",""
Spark shell,local-1623876083964,0.0,"",0,0,133857,0.0,false,0,"",20,100.00,"","","",""
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Score,Potential Problems,SQL Dataframe Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,ReadFileFormat Score,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Unsupported Nested Complex Types
Spark shell,local-1623876083964,1417661.00,"",119903,1417661,133857,91.14,false,0,"",20,100.00,"","","",""
Spark shell,local-1623876083964,0.0,"",0,0,133857,0,false,0,"",20,100.00,"","","",""
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
App Name,App ID,Score,Potential Problems,SQL Dataframe Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,ReadFileFormat Score,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Unsupported Nested Complex Types
Rapids Spark Profiling Tool Unit Tests,local-1622043423018,125035.00,"",11128,125035,16319,37.81,false,0,"",20,100.00,"",JSON,"",""
Rapids Spark Profiling Tool Unit Tests,local-1623281204390,3732.80,UDF,2032,4666,6240,46.27,false,577,"",20,0.00,"JSON[*]",JSON,"",""
Rapids Spark Profiling Tool Unit Tests,local-1622043423018,0.0,"",0,0,16319,0.0,false,0,"",20,100.00,"",JSON,"",""
Rapids Spark Profiling Tool Unit Tests,local-1621966649543,0.00,"",0,0,10650,0.0,false,0,"",20,100.00,"",JSON,"",""
Rapids Spark Profiling Tool Unit Tests,local-1621955976602,0.00,"",0,0,10419,0.0,false,0,"",20,100.00,"",JSON,"",""
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Score,Potential Problems,SQL Dataframe Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,ReadFileFormat Score,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Unsupported Nested Complex Types
Spark shell,local-1624892957956,37581.00,"",3751,37581,17801,58.47,false,0,"",20,100.00,"","","",""
Spark shell,local-1634253215009,335.07,"",1520,359,47063,67.64,false,0,"",20,66.67,Text[*],"","",""
Binary file modified tools/src/test/resources/spark-events-profiling/spark2-eventlog.zstd
100755 → 100644
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ class ApplicationInfoSuite extends FunSuite with Logging {
assert(apps.size == 1)
assert(apps.head.sparkVersion.equals("2.2.3"))
assert(apps.head.gpuMode.equals(false))
assert(apps.head.jobIdToInfo.keys.toSeq.size == 1)
assert(apps.head.jobIdToInfo.keys.toSeq.size == 6)
assert(apps.head.jobIdToInfo.keys.toSeq.contains(0))
val stage0 = apps.head.stageIdToInfo.get((0, 0))
assert(stage0.isDefined)
assert(stage0.get.info.numTasks.equals(6))
assert(stage0.get.info.numTasks.equals(1))
}

test("test no sql eventlog") {
Expand Down Expand Up @@ -516,7 +516,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
val execInfo = collect.getExecutorInfo
assert(execInfo.size == 1)
assert(execInfo.head.numExecutors === 1)
assert(execInfo.head.maxMem === 16991335219L)
assert(execInfo.head.maxMem === 384093388L)
}

test("test executor info cluster mode") {
Expand Down Expand Up @@ -612,7 +612,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
val dotDirs = ToolTestUtils.listFilesMatching(tempSubDir, { f =>
f.endsWith(".csv")
})
assert(dotDirs.length === 11)
assert(dotDirs.length === 12)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down Expand Up @@ -642,7 +642,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
val dotDirs = ToolTestUtils.listFilesMatching(tempSubDir, { f =>
f.endsWith(".csv")
})
assert(dotDirs.length === 9)
assert(dotDirs.length === 10)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ class CompareSuite extends FunSuite {
val compare = new CompareApplications(apps)
val (matchingSqlIdsRet, matchingStageIdsRet) = compare.findMatchingStages()
// none match
assert(matchingSqlIdsRet.size === 29)
assert(matchingSqlIdsRet.size === 32)
assert(matchingSqlIdsRet.head.outputHeaders.size == 2)
assert(matchingSqlIdsRet.head.rows.size == 2)
assert(matchingStageIdsRet.size === 73)
assert(matchingStageIdsRet.size === 75)
assert(matchingStageIdsRet.head.outputHeaders.size == 2)
assert(matchingStageIdsRet.head.rows.size == 2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging {
val (exit, appSum) = QualificationMain.mainInternal(appArgs)
assert(exit == 0)
assert(appSum.size == 4)
assert(appSum.head.appId.equals("local-1622043423018"))
assert(appSum.head.appId.equals("local-1623281204390"))

val filename = s"$outpath/rapids_4_spark_qualification_output/" +
s"rapids_4_spark_qualification_output.log"
Expand Down Expand Up @@ -153,7 +153,7 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging {
val (exit, appSum) = QualificationMain.mainInternal(appArgs)
assert(exit == 0)
assert(appSum.size == 4)
assert(appSum.head.appId.equals("local-1622043423018"))
assert(appSum.head.appId.equals("local-1623281204390"))

val filename = s"$outpath/rapids_4_spark_qualification_output/" +
s"rapids_4_spark_qualification_output.log"
Expand All @@ -164,7 +164,7 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(lines.size == (4 + 4))
// skip the 3 header lines
val firstRow = lines(3)
assert(firstRow.contains("local-1622043423018"))
assert(firstRow.contains("local-1623281204390"))
} finally {
inputSource.close()
}
Expand Down

0 comments on commit e6a69f1

Please sign in to comment.