Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profile/qualification tool error handling improvements and support spark < 3.1.1 #2604

Merged
merged 47 commits into from
Jun 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
9ffe53e
Qualification tool
tgravescs Jun 3, 2021
56c8cc2
remove unused func
tgravescs Jun 3, 2021
ef5cf4c
Add missing files
tgravescs Jun 3, 2021
6f5271c
Add checks for format option
tgravescs Jun 3, 2021
81dc6a4
cast columsn to string to write to text
tgravescs Jun 3, 2021
ee8c1dd
Revert "Add checks for format option"
tgravescs Jun 3, 2021
bc0acd1
cleanup
tgravescs Jun 3, 2021
e9dc1ce
update output dir
tgravescs Jun 3, 2021
aa298b2
formating
tgravescs Jun 3, 2021
f3f35f4
Update help messages
tgravescs Jun 3, 2021
10f6678
update app name
tgravescs Jun 3, 2021
9e0d104
cleanup
tgravescs Jun 3, 2021
50b4c29
put test functions back
tgravescs Jun 3, 2021
f2162f1
fix typo
tgravescs Jun 3, 2021
37b7225
add printSQLPlanMetrics and printRapidsJar
tgravescs Jun 3, 2021
208baf4
use opt
tgravescs Jun 3, 2021
30bf946
Add Analysis
tgravescs Jun 3, 2021
e04e926
format output
tgravescs Jun 3, 2021
10cfc28
Merge remote-tracking branch 'origin/branch-21.06' into profilingToolImp
tgravescs Jun 4, 2021
19d804f
more tests
tgravescs Jun 4, 2021
a4e6095
tests working
tgravescs Jun 4, 2021
21fc1e6
test rearrange utils
tgravescs Jun 4, 2021
3fd1864
move test file
tgravescs Jun 4, 2021
2507bd5
move test file right location
tgravescs Jun 4, 2021
d3418b9
add Analysis Suite
tgravescs Jun 4, 2021
bd3f4d9
update test analysis
tgravescs Jun 4, 2021
065ff96
add
tgravescs Jun 4, 2021
ec845ee
add more tests
tgravescs Jun 4, 2021
66a4e58
more tests
tgravescs Jun 4, 2021
08c8f4d
remove unneeded expectation file
tgravescs Jun 4, 2021
97cd725
Add more analysis tests
tgravescs Jun 4, 2021
3fdeb2a
comment
tgravescs Jun 4, 2021
3308e65
cleanup
tgravescs Jun 4, 2021
6859bf5
Start handling ResourceProfile with reflection
tgravescs Jun 4, 2021
341d998
allow spark 3.0 and 3.1.1 to parse logs and fix bug with missing table
tgravescs Jun 4, 2021
b6ad6f5
Merge remote-tracking branch 'origin/branch-21.06' into profilefixspa…
tgravescs Jun 4, 2021
7cb222b
Add more test files
tgravescs Jun 5, 2021
6193fe1
Add more tests and error handling
tgravescs Jun 5, 2021
3295bc9
improve error handling to skip bad event logs
tgravescs Jun 5, 2021
dca1292
update readme
tgravescs Jun 5, 2021
ba2098a
Merge remote-tracking branch 'origin/branch-21.06' into profilefixspa…
tgravescs Jun 5, 2021
828b302
remove unneeded temp dir
tgravescs Jun 6, 2021
f9fb166
Revert "remove unneeded temp dir"
tgravescs Jun 6, 2021
94cc5b6
close file writer on exception
tgravescs Jun 6, 2021
0fbe989
Merge remote-tracking branch 'origin/branch-21.06' into profilefixspa…
tgravescs Jun 6, 2021
1a71d26
move test files
tgravescs Jun 6, 2021
40eeead
fix move of test files
tgravescs Jun 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Information such as Spark version, executor information, properties and so on. W
(The code is based on Apache Spark 3.1.1 source code, and tested using Spark 3.0.x and 3.1.1 event logs)

## Prerequisites
- Spark 3.1.1 or newer installed
- Spark 3.0.1 or newer installed
- Java 8 or above
- Complete Spark event log(s) from Spark 3.0 or above version.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo], fileWriter: FileWri
def printSQLPlanMetrics(shouldGenDot: Boolean, outputDir: String,
writeOutput: Boolean = true): Unit ={
for (app <- apps){
val messageHeader = "\nSQL Plan Metrics for Application:\n"
val accums = app.runQuery(app.generateSQLAccums, fileWriter = Some(fileWriter),
messageHeader=messageHeader)
if (shouldGenDot) {
generateDot(outputDir, Some(accums))
if (app.allDataFrames.contains(s"sqlMetricsDF_${app.index}") &&
app.allDataFrames.contains(s"driverAccumDF_${app.index}") &&
app.allDataFrames.contains(s"taskStageAccumDF_${app.index}")) {
val messageHeader = "\nSQL Plan Metrics for Application:\n"
val accums = app.runQuery(app.generateSQLAccums, fileWriter = Some(fileWriter),
messageHeader=messageHeader)
if (shouldGenDot) {
generateDot(outputDir, Some(accums))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,37 +69,51 @@ object ProfileMain extends Logging {
if (appArgs.compare()) {
// Create an Array of Applications(with an index starting from 1)
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
var index: Int = 1
for (path <- allPaths.filter(p => !p.getName.contains("."))) {
apps += new ApplicationInfo(numOutputRows, sparkSession, path, index)
index += 1
}

//Exit if there are no applications to process.
if (apps.isEmpty) {
logInfo("No application to process. Exiting")
return 0
try {
var index: Int = 1
for (path <- allPaths.filter(p => !p.getName.contains("."))) {
apps += new ApplicationInfo(numOutputRows, sparkSession, path, index)
index += 1
}

//Exit if there are no applications to process.
if (apps.isEmpty) {
logInfo("No application to process. Exiting")
return 0
}
processApps(apps, generateDot = false)
} catch {
case e: com.fasterxml.jackson.core.JsonParseException =>
fileWriter.close()
logError(s"Error parsing JSON", e)
return 1
}
processApps(apps, generateDot = false)
// Show the application Id <-> appIndex mapping.
for (app <- apps) {
logApplicationInfo(app)
}
} else {
// This mode is to process one application at one time.
var index: Int = 1
for (path <- allPaths.filter(p => !p.getName.contains("."))) {
// This apps only contains 1 app in each loop.
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val app = new ApplicationInfo(numOutputRows, sparkSession, path, index)
apps += app
logApplicationInfo(app)
// This is a bit odd that we process apps individual right now due to
// memory concerns. So the aggregation functions only aggregate single
// application not across applications.
processApps(apps, appArgs.generateDot())
app.dropAllTempViews()
index += 1
try {
for (path <- allPaths.filter(p => !p.getName.contains("."))) {
// This apps only contains 1 app in each loop.
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val app = new ApplicationInfo(numOutputRows, sparkSession, path, index)
apps += app
logApplicationInfo(app)
// This is a bit odd that we process apps individual right now due to
// memory concerns. So the aggregation functions only aggregate single
// application not across applications.
processApps(apps, appArgs.generateDot())
app.dropAllTempViews()
index += 1
}
} catch {
case e: com.fasterxml.jackson.core.JsonParseException =>
fileWriter.close()
logError(s"Error parsing JSON", e)
return 1
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,22 @@ object Qualification extends Logging {
numRows: Int,
sparkSession: SparkSession,
includeCpuPercent: Boolean,
dropTempViews: Boolean): DataFrame = {
dropTempViews: Boolean): Option[DataFrame] = {
var index: Int = 1
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
for (path <- allPaths.filterNot(_.getName.contains("."))) {
// This apps only contains 1 app in each loop.
val app = new ApplicationInfo(numRows, sparkSession,
path, index, true)
apps += app
logApplicationInfo(app)
index += 1
try {
// This apps only contains 1 app in each loop.
val app = new ApplicationInfo(numRows, sparkSession, path, index, true)
apps += app
logApplicationInfo(app)
index += 1
} catch {
case e: com.fasterxml.jackson.core.JsonParseException =>
logWarning(s"Error parsing JSON, skipping $path")
}
}
if (apps.isEmpty) return None
val analysis = new Analysis(apps, None)
if (includeCpuPercent) {
val sqlAggMetricsDF = analysis.sqlMetricsAggregation()
Expand All @@ -65,7 +70,7 @@ object Qualification extends Logging {
sparkSession.catalog.dropTempView("sqlAggMetricsDF")
apps.foreach( _.dropAllTempViews())
}
df
Some(df)
}

def constructQueryQualifyApps(apps: ArrayBuffer[ApplicationInfo],
Expand All @@ -78,8 +83,7 @@ object Qualification extends Logging {
case false => "(" + app.qualificationDurationNoMetricsSQL + ")"
}
}.mkString(" union ")
val df = apps.head.runQuery(query + " order by Rank desc, `App Duration` desc")
df
apps.head.runQuery(query + " order by Score desc, `App Duration` desc")
}

def writeQualification(df: DataFrame, outputDir: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object QualificationMain extends Logging {
*/
def main(args: Array[String]) {
val sparkSession = ProfileUtils.createSparkSession
val (exitCode, optDf) = mainInternal(sparkSession, new QualificationArgs(args))
val (exitCode, _) = mainInternal(sparkSession, new QualificationArgs(args))
if (exitCode != 0) {
System.exit(exitCode)
}
Expand All @@ -55,13 +55,13 @@ object QualificationMain extends Logging {

val includeCpuPercent = appArgs.includeExecCpuPercent.getOrElse(false)
val numOutputRows = appArgs.numOutputRows.getOrElse(1000)
val df = Qualification.qualifyApps(allPaths,
val dfOpt = Qualification.qualifyApps(allPaths,
numOutputRows, sparkSession, includeCpuPercent, dropTempViews)
if (writeOutput) {
Qualification.writeQualification(df, outputDirectory,
if (writeOutput && dfOpt.isDefined) {
Qualification.writeQualification(dfOpt.get, outputDirectory,
appArgs.outputFormat.getOrElse("csv"), includeCpuPercent, numOutputRows)
}
(0, Some(df))
(0, dfOpt)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ class ApplicationInfo(
s"""select
|first(appName) as `App Name`,
|'$appId' as `App ID`,
|ROUND((sum(sqlQualDuration) * 100) / first(app.duration), 2) as Rank,
|ROUND((sum(sqlQualDuration) * 100) / first(app.duration), 2) as Score,
|concat_ws(",", collect_list(problematic)) as `Potential Problems`,
|sum(sqlQualDuration) as `SQL Dataframe Duration`,
|first(app.duration) as `App Duration`
Expand All @@ -720,7 +720,7 @@ class ApplicationInfo(
def qualificationDurationSumSQL: String = {
s"""select first(appName) as `App Name`,
|first(appID) as `App ID`,
|ROUND((sum(dfDuration) * 100) / first(appDuration), 2) as Rank,
|ROUND((sum(dfDuration) * 100) / first(appDuration), 2) as Score,
|concat_ws(",", collect_list(potentialProblems)) as `Potential Problems`,
|sum(dfDuration) as `SQL Dataframe Duration`,
|first(appDuration) as `App Duration`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ object EventsProcessor extends Logging {
event match {
case _: SparkListenerLogStart =>
doSparkListenerLogStart(app, event.asInstanceOf[SparkListenerLogStart])
case _: SparkListenerResourceProfileAdded =>
doSparkListenerResourceProfileAdded(app,
event.asInstanceOf[SparkListenerResourceProfileAdded])
case _: SparkListenerBlockManagerAdded =>
doSparkListenerBlockManagerAdded(app,
event.asInstanceOf[SparkListenerBlockManagerAdded])
Expand Down Expand Up @@ -94,7 +91,33 @@ object EventsProcessor extends Logging {
case _: SparkListenerSQLAdaptiveSQLMetricUpdates =>
doSparkListenerSQLAdaptiveSQLMetricUpdates(app,
event.asInstanceOf[SparkListenerSQLAdaptiveSQLMetricUpdates])
case _ => doOtherEvent(app, event)
case _ =>
val wasResourceProfileAddedEvent = doSparkListenerResourceProfileAddedReflect(app, event)
if (!wasResourceProfileAddedEvent) doOtherEvent(app, event)
}
}

def doSparkListenerResourceProfileAddedReflect(
app: ApplicationInfo,
event: SparkListenerEvent): Boolean = {
val rpAddedClass = "org.apache.spark.scheduler.SparkListenerResourceProfileAdded"
if (event.getClass.getName.equals(rpAddedClass)) {
try {
event match {
case _: SparkListenerResourceProfileAdded =>
doSparkListenerResourceProfileAdded(app,
event.asInstanceOf[SparkListenerResourceProfileAdded])
true
case _ => false
}
} catch {
case _: ClassNotFoundException =>
logWarning("Error trying to parse SparkListenerResourceProfileAdded, Spark" +
" version likely older than 3.1.X, unable to parse it properly.")
false
}
} else {
false
}
}

Expand Down

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions tools/src/test/resources/spark-events-profiling/rp_nosql_eventlog

Large diffs are not rendered by default.

477 changes: 477 additions & 0 deletions tools/src/test/resources/spark-events-profiling/rp_sql_eventlog

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.FunSuite
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SparkSession, TrampolineUtil}
import org.apache.spark.sql.rapids.tool.profiling._

class ApplicationInfoSuite extends FunSuite with Logging {
Expand Down Expand Up @@ -82,6 +82,42 @@ class ApplicationInfoSuite extends FunSuite with Logging {
assert(cuDFJar.size == 1, "CUDF jar check")
}

test("test sql and resourceprofile eventlog") {
val eventLog = s"$logDir/rp_sql_eventlog"
TrampolineUtil.withTempDir { tempDir =>
val appArgs = new ProfileArgs(Array(
"--output-directory",
tempDir.getAbsolutePath,
eventLog))
val exit = ProfileMain.mainInternal(sparkSession, appArgs)
assert(exit == 0)
}
}

test("malformed json eventlog") {
val eventLog = s"$logDir/malformed_json_eventlog"
TrampolineUtil.withTempDir { tempDir =>
val appArgs = new ProfileArgs(Array(
"--output-directory",
tempDir.getAbsolutePath,
eventLog))
val exit = ProfileMain.mainInternal(sparkSession, appArgs)
assert(exit == 1)
}
}

test("test no sql eventlog") {
val eventLog = s"$logDir/rp_nosql_eventlog"
TrampolineUtil.withTempDir { tempDir =>
val appArgs = new ProfileArgs(Array(
"--output-directory",
tempDir.getAbsolutePath,
eventLog))
val exit = ProfileMain.mainInternal(sparkSession, appArgs)
assert(exit == 0)
}
}

test("test printSQLPlanMetrics") {
var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val appArgs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging {
}
}

test("skip malformed json eventlog") {
val profileLogDir = ToolTestUtils.getTestResourcePath("spark-events-profiling")
val badEventLog = s"$profileLogDir/malformed_json_eventlog"
val logFiles = Array(s"$logDir/nds_q86_test", badEventLog)
runQualificationTest(logFiles, "nds_q86_test_expectation.csv")
}

test("test udf event logs") {
val logFiles = Array(
s"$logDir/dataset_eventlog",
Expand Down