Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profiling tool: Update comparison mode output format and add error handling #2762

Merged
merged 9 commits into from
Jun 23, 2021
74 changes: 37 additions & 37 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,29 +241,29 @@ We can input multiple Spark event logs and this tool can compare environments, e
### A. Compare Information Collected ###
Compare Application Information:

+--------+-----------------------+-------------+-------------+--------+-----------+------------+-------+
|appIndex|appId |startTime |endTime |duration|durationStr|sparkVersion|gpuMode|
+--------+-----------------------+-------------+-------------+--------+-----------+------------+-------+
|1 |app-20210329165943-0103|1617037182848|1617037490515|307667 |5.1 min |3.0.1 |false |
|2 |app-20210329170243-0018|1617037362324|1617038578035|1215711 |20 min |3.0.1 |true |
+--------+-----------------------+-------------+-------------+--------+-----------+------------+-------+
+--------+-----------+-----------------------+-------------+-------------+--------+-----------+------------+-------+
|appIndex|appName |appId |startTime |endTime |duration|durationStr|sparkVersion|gpuMode|
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
+--------+-----------+-----------------------+-------------+-------------+--------+-----------+------------+-------+
|1 |Spark shell|app-20210329165943-0103|1617037182848|1617037490515|307667 |5.1 min |3.0.1 |false |
|2 |Spark shell|app-20210329170243-0018|1617037362324|1617038578035|1215711 |20 min |3.0.1 |true |
+--------+-----------+-----------------------+-------------+-------------+--------+-----------+------------+-------+
```

- Compare Executor information:
```
Compare Executor Information:
+--------+----------+----------+-----------+------------+-------------+--------+--------+--------+------------+--------+--------+
|appIndex|executorID|totalCores|maxMem |maxOnHeapMem|maxOffHeapMem|exec_cpu|exec_mem|exec_gpu|exec_offheap|task_cpu|task_gpu|
+--------+----------+----------+-----------+------------+-------------+--------+--------+--------+------------+--------+--------+
|1 |0 |4 |13984648396|13984648396 |0 |null |null |null |null |null |null |
|1 |1 |4 |13984648396|13984648396 |0 |null |null |null |null |null |null |
+--------+------------+----------------+-----------+------------+-------------+--------+--------+--------+------------+--------+--------+
|appIndex|numExecutors|coresPerExecutor|maxMem |maxOnHeapMem|maxOffHeapMem|exec_cpu|exec_mem|exec_gpu|exec_offheap|task_cpu|task_gpu|
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
+--------+------------+----------------+-----------+------------+-------------+--------+--------+--------+------------+--------+--------+
|1 |2 |4 |47055896576|25581060096 |21474836480 |null |null |null |null |null |null |
|2 |2 |4 |55645831168|12696158208 |42949672960 |null |null |null |null |null |null |
```

- Compare Rapids related Spark properties side-by-side:
```
Compare Rapids Properties which are set explicitly:
+-------------------------------------------+----------+----------+
|key |value_app1|value_app2|
|propertyName |appIndex_1|appIndex_2|
+-------------------------------------------+----------+----------+
|spark.rapids.memory.pinnedPool.size |null |2g |
|spark.rapids.sql.castFloatToDecimal.enabled|null |true |
Expand Down Expand Up @@ -370,7 +370,7 @@ SQL Duration and Executor CPU Time Percent
```
Shuffle Skew Check: (When task's Shuffle Read Size > 3 * Avg Stage-level size)
+--------+-------+--------------+------+-------+---------------+--------------+-----------------+----------------+----------------+----------+----------------------------------------------------------------------------------------------------+
|appIndex|stageId|stageAttemptId|taskId|attempt|taskDurationSec|avgDurationSec|taskShuffleReadMB|avgShuffleReadMB|taskPeakMemoryMB|successful|endReason_first100char |
|appIndex|stageId|stageAttemptId|taskId|attempt|taskDurationSec|avgDurationSec|taskShuffleReadMB|avgShuffleReadMB|taskPeakMemoryMB|successful|reason |
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
+--------+-------+--------------+------+-------+---------------+--------------+-----------------+----------------+----------------+----------+----------------------------------------------------------------------------------------------------+
|1 |2 |0 |2222 |0 |111.11 |7.7 |2222.22 |111.11 |0.01 |false |ExceptionFailure(ai.rapids.cudf.CudfException,cuDF failure at: /dddd/xxxxxxx/ccccc/bbbbbbbbb/aaaaaaa|
|1 |2 |0 |2224 |1 |222.22 |8.8 |3333.33 |111.11 |0.01 |false |ExceptionFailure(ai.rapids.cudf.CudfException,cuDF failure at: /dddd/xxxxxxx/ccccc/bbbbbbbbb/aaaaaaa|
Expand All @@ -384,46 +384,46 @@ Below are examples.
- Print failed tasks:
```
Failed tasks:
+-------+--------------+------+-------+----------------------------------------------------------------------------------------------------+
|stageId|stageAttemptId|taskId|attempt|endReason_first100char |
+-------+--------------+------+-------+----------------------------------------------------------------------------------------------------+
|4 |0 |2842 |0 |ExceptionFailure(ai.rapids.cudf.CudfException,cuDF failure at: /home/jenkins/agent/workspace/jenkins|
|4 |0 |2858 |0 |TaskKilled(another attempt succeeded,List(AccumulableInfo(453,None,Some(22000),None,false,true,None)|
|4 |0 |2884 |0 |TaskKilled(another attempt succeeded,List(AccumulableInfo(453,None,Some(21148),None,false,true,None)|
|4 |0 |2908 |0 |TaskKilled(another attempt succeeded,List(AccumulableInfo(453,None,Some(20420),None,false,true,None)|
|4 |0 |3410 |1 |ExceptionFailure(ai.rapids.cudf.CudfException,cuDF failure at: /home/jenkins/agent/workspace/jenkins|
+-------+--------------+------+-------+----------------------------------------------------------------------------------------------------+
+--------+-------+--------------+------+-------+----------------------------------------------------------------------------------------------------+
|appIndex|stageId|stageAttemptId|taskId|attempt|failureReason |
+--------+-------+--------------+------+-------+----------------------------------------------------------------------------------------------------+
|3 |4 |0 |2842 |0 |ExceptionFailure(ai.rapids.cudf.CudfException,cuDF failure at: /home/jenkins/agent/workspace/jenkins|
|3 |4 |0 |2858 |0 |TaskKilled(another attempt succeeded,List(AccumulableInfo(453,None,Some(22000),None,false,true,None)|
|3 |4 |0 |2884 |0 |TaskKilled(another attempt succeeded,List(AccumulableInfo(453,None,Some(21148),None,false,true,None)|
|3 |4 |0 |2908 |0 |TaskKilled(another attempt succeeded,List(AccumulableInfo(453,None,Some(20420),None,false,true,None)|
|3 |4 |0 |3410 |1 |ExceptionFailure(ai.rapids.cudf.CudfException,cuDF failure at: /home/jenkins/agent/workspace/jenkins|
+--------+-------+--------------+------+-------+----------------------------------------------------------------------------------------------------+
```

- Print failed stages:
```
Failed stages:
+-------+---------+-------------------------------------+--------+---------------------------------------------------+
|stageId|attemptId|name |numTasks|failureReason_first100char |
+-------+---------+-------------------------------------+--------+---------------------------------------------------+
|4 |0 |attachTree at Spark300Shims.scala:624|1000 |Job 0 cancelled as part of cancellation of all jobs|
+-------+---------+-------------------------------------+--------+---------------------------------------------------+
+--------+-------+---------+-------------------------------------+--------+---------------------------------------------------+
|appIndex|stageId|attemptId|name |numTasks|failureReason |
+--------+-------+---------+-------------------------------------+--------+---------------------------------------------------+
|3 |4 |0 |attachTree at Spark300Shims.scala:624|1000 |Job 0 cancelled as part of cancellation of all jobs|
+--------+-------+---------+-------------------------------------+--------+---------------------------------------------------+
```

- Print failed jobs:
```
Failed jobs:
+-----+---------+------------------------------------------------------------------------+
|jobID|jobResult|failedReason_first100char |
+-----+---------+------------------------------------------------------------------------+
|0 |JobFailed|java.lang.Exception: Job 0 cancelled as part of cancellation of all jobs|
+-----+---------+------------------------------------------------------------------------+
+--------+-----+---------+------------------------------------------------------------------------+
|appIndex|jobID|jobResult|failureReason |
+--------+-----+---------+------------------------------------------------------------------------+
|3 |0 |JobFailed|java.lang.Exception: Job 0 cancelled as part of cancellation of all jobs|
+--------+-----+---------+------------------------------------------------------------------------+
```

- SQL Plan HealthCheck:

Prints possibly unsupported query plan nodes such as `$Lambda` key word means dataset API.
```
+-----+------+--------+---------------------------------------------------------------------------------------------------+
|sqlID|nodeID|nodeName|nodeDesc_first100char |
+-----+------+--------+---------------------------------------------------------------------------------------------------+
|1 |8 |Filter |Filter $line21.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$4578/0x00000008019f1840@4b63e04c.apply|
+-----+------+--------+---------------------------------------------------------------------------------------------------+
+--------+-----+------+--------+---------------------------------------------------------------------------------------------------+
|appIndex|sqlID|nodeID|nodeName|nodeDescription |
+--------+-----+------+--------+---------------------------------------------------------------------------------------------------+
|3 |1 |8 |Filter |Filter $line21.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$4578/0x00000008019f1840@4b63e04c.apply|
+--------+-----+------+--------+---------------------------------------------------------------------------------------------------+
```

### How to use this tool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class Analysis(apps: ArrayBuffer[ApplicationInfo], fileWriter: Option[ToolTextFi
}.map( app => "(" + app.profilingDurationSQL + ")")
.mkString(" union ")
if (query.nonEmpty) {
apps.head.runQuery(query, false, fileWriter, messageHeader)
apps.head.runQuery(query + "order by appIndex", false, fileWriter, messageHeader)
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
} else {
apps.head.sparkSession.emptyDataFrame
}
Expand Down Expand Up @@ -211,7 +211,7 @@ class Analysis(apps: ArrayBuffer[ApplicationInfo], fileWriter: Option[ToolTextFi
|round(tmp.avgShuffleReadBytes/1024/1024,2) as avgShuffleReadMB,
|round(t.peakExecutionMemory/1024/1024,2) as taskPeakMemoryMB,
|t.successful,
|substr(t.endReason,0,100) endReason_first100char
|substr(t.endReason,0,100) reason
|from tmp, taskDF_${app.index} t
|where tmp.stageId=t.StageId
|and tmp.stageAttemptId=t.stageAttemptId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo],
val messageHeader = "\nExecutor Information:\n"
for (app <- apps) {
if (app.allDataFrames.contains(s"executorsDF_${app.index}")) {
app.runQuery(query = app.generateExecutorInfo + " order by cast(executorID as long)",
app.runQuery(query = app.generateExecutorInfo + " order by cast(numExecutors as long)",
fileWriter = fileWriter, messageHeader = messageHeader)
} else {
fileWriter.foreach(_.write("No Executor Information Found!\n"))
Expand All @@ -98,7 +98,7 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo],
val messageHeader = "\nSpark Rapids parameters set explicitly:\n"
for (app <- apps) {
if (app.allDataFrames.contains(s"propertiesDF_${app.index}")) {
app.runQuery(query = app.generateRapidsProperties + " order by key",
app.runQuery(query = app.generateRapidsProperties + " order by propertyName",
fileWriter = fileWriter, messageHeader = messageHeader)
} else {
fileWriter.foreach(_.write("No Spark Rapids parameters Found!\n"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
* CompareApplications compares multiple ApplicationInfo objects
*/
class CompareApplications(apps: ArrayBuffer[ApplicationInfo],
fileWriter: ToolTextFileWriter) extends Logging {
fileWriter: Option[ToolTextFileWriter]) extends Logging {

require(apps.size>1)

Expand All @@ -36,15 +36,19 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo],
var query = ""
var i = 1
for (app <- apps) {
query += app.generateAppInfo
if (i < apps.size) {
query += "\n union \n"
if (app.allDataFrames.contains(s"appDF_${app.index}")) {
query += app.generateAppInfo
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
if (i < apps.size) {
query += "\n union \n"
} else {
query += " order by appIndex"
}
} else {
query += " order by appIndex"
fileWriter.foreach(_.write("No Application Information Found!\n"))
}
i += 1
}
apps.head.runQuery(query = query, fileWriter = Some(fileWriter), messageHeader = messageHeader)
apps.head.runQuery(query = query, fileWriter = fileWriter, messageHeader = messageHeader)
}

// Compare Job information
Expand All @@ -53,15 +57,19 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo],
var query = ""
var i = 1
for (app <- apps) {
query += app.jobtoStagesSQL
if (i < apps.size) {
query += "\n union \n"
if (app.allDataFrames.contains(s"jobDF_${app.index}")) {
query += app.jobtoStagesSQL
if (i < apps.size) {
query += "\n union \n"
} else {
query += " order by appIndex"
}
} else {
query += " order by appIndex"
fileWriter.foreach(_.write("No Job Information Found!\n"))
}
i += 1
}
apps.head.runQuery(query = query, fileWriter = Some(fileWriter), messageHeader = messageHeader)
apps.head.runQuery(query = query, fileWriter = fileWriter, messageHeader = messageHeader)
}

// Compare Executors information
Expand All @@ -70,52 +78,53 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo],
var query = ""
var i = 1
for (app <- apps) {
query += app.generateExecutorInfo
if (i < apps.size) {
query += "\n union \n"
if (app.allDataFrames.contains(s"executorsDF_${app.index}")) {
query += app.generateExecutorInfo
if (i < apps.size) {
query += "\n union \n"
} else {
query += " order by appIndex"
}
} else {
query += " order by appIndex, cast(executorID as long)"
fileWriter.foreach(_.write("No Executor Information Found!\n"))
}
i += 1
}
apps.head.runQuery(query = query, fileWriter = Some(fileWriter), messageHeader = messageHeader)
apps.head.runQuery(query = query, fileWriter = fileWriter, messageHeader = messageHeader)
}

// Compare Rapids Properties which are set explicitly
def compareRapidsProperties(): Unit ={
def compareRapidsProperties(): Unit = {
val messageHeader = "\n\nCompare Rapids Properties which are set explicitly:\n"
var withClauseAllKeys = "with allKeys as \n ("
val selectKeyPart = "select allKeys.key"
val selectKeyPart = "select allKeys.propertyName"
var selectValuePart = ""
var query = " allKeys LEFT OUTER JOIN \n"
var i = 1
for (app <- apps) {
// For the 1st app
if (i == 1) {
withClauseAllKeys += "select distinct key from (" +
app.generateRapidsProperties + ") union "
query += "(" + app.generateRapidsProperties + s") tmp_$i"
query += s" on allKeys.key=tmp_$i.key"
query += "\n LEFT OUTER JOIN \n"
} else if (i < apps.size) { // For the 2nd to non-last app(s)
withClauseAllKeys += "select distinct key from (" +
app.generateRapidsProperties + ") union "
query += "(" + app.generateRapidsProperties + s") tmp_$i"
query += s" on allKeys.key=tmp_$i.key"
query += "\n LEFT OUTER JOIN \n"
} else { // For the last app
withClauseAllKeys += "select distinct key from (" +
app.generateRapidsProperties + "))\n"
query += "(" + app.generateRapidsProperties + s") tmp_$i"
query += s" on allKeys.key=tmp_$i.key"
if (app.allDataFrames.contains(s"propertiesDF_${app.index}")) {
if (i < apps.size) {
withClauseAllKeys += "select distinct propertyName from (" +
app.generateRapidsProperties + ") union "
query += "(" + app.generateRapidsProperties + s") tmp_$i"
query += s" on allKeys.propertyName=tmp_$i.propertyName"
query += "\n LEFT OUTER JOIN \n"
} else { // For the last app
withClauseAllKeys += "select distinct propertyName from (" +
app.generateRapidsProperties + "))\n"
query += "(" + app.generateRapidsProperties + s") tmp_$i"
query += s" on allKeys.propertyName=tmp_$i.propertyName"
}
selectValuePart += s",appIndex_${app.index}"
} else {
fileWriter.foreach(_.write("No Spark Rapids parameters Found!\n"))
}
selectValuePart += s",value_app$i"
i += 1
}

query = withClauseAllKeys + selectKeyPart + selectValuePart +
" from (\n" + query + "\n) order by key"
" from (\n" + query + "\n) order by propertyName"
logDebug("Running query " + query)
apps.head.runQuery(query = query, fileWriter = Some(fileWriter), messageHeader = messageHeader)
apps.head.runQuery(query = query, fileWriter = fileWriter, messageHeader = messageHeader)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object ProfileMain extends Logging {
if (appArgs.compare()) { // Compare Applications

textFileWriter.write("### A. Compare Information Collected ###")
val compare = new CompareApplications(apps, textFileWriter)
val compare = new CompareApplications(apps, Some(textFileWriter))
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
compare.compareAppInfo()
compare.compareExecutorInfo()
compare.compareJobInfo()
Expand Down
Loading