Skip to content

Commit

Permalink
Add all stage metrics to tools output (#1151)
Browse files Browse the repository at this point in the history
* Add Gpu metrics to profiling tool output

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* add file

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* addressed review comments

* address review comments

* addressed review comments, removed unwanted code.

---------

Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored Jul 3, 2024
1 parent 93e512e commit 91fed9d
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.analysis
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser
import com.nvidia.spark.rapids.tool.profiling.{DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer

import org.apache.spark.sql.execution.SparkPlanInfo
Expand Down Expand Up @@ -342,6 +342,58 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
}
}
}

/**
* Generate the stage level metrics for the SQL plan including GPU metrics if applicable.
* Along with Spark defined metrics, below is the list of GPU metrics that are collected if they
* are present in the eventlog:
* gpuSemaphoreWait, gpuRetryCount, gpuSplitAndRetryCount, gpuRetryBlockTime,
* gpuRetryComputationTime, gpuSpillToHostTime, gpuSpillToDiskTime, gpuReadSpillFromHostTime,
* gpuReadSpillFromDiskTime
*
* @return a sequence of AccumProfileResults
*/
def generateStageLevelAccums(): Seq[AccumProfileResults] = {

def computeStatistics(updates: Seq[Long]): Option[StatisticsMetrics] = {
// drop the metrics if there are no values
if (updates.isEmpty) {
None
} else if (updates.length == 1) {
Some(StatisticsMetrics(0L, 0L, 0L, updates.sum))
} else {
Some(StatisticsMetrics(
min = updates.head,
med = updates(updates.size / 2),
max = updates.last,
total = updates.sum
))
}
}

// Process taskStageAccumMap to get all the accumulators
val stageLevelAccums = app.taskStageAccumMap.values.flatten
val groupedByAccumulatorId = stageLevelAccums.groupBy(_.accumulatorId)
groupedByAccumulatorId.flatMap { case (accumulatorId, accums) =>
// Extract and sort the update values, defaulting to 0 if not present
val sortedUpdates = accums.flatMap(_.update).toSeq.sorted

// Compute the statistics for the accumulator if applicable
computeStatistics(sortedUpdates).map { stats =>
val sampleAccum = accums.head
AccumProfileResults(
appIndex = appIndex,
stageId = sampleAccum.stageId.toString,
accumulatorId = accumulatorId,
name = sampleAccum.name.getOrElse("Unknown"),
min = stats.min,
median = stats.med,
max = stats.max,
total = stats.total
)
}
}.toSeq
}
}

object AppSQLPlanAnalyzer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ case class ApplicationSummaryInfo(
rapidsProps: Seq[RapidsPropertyProfileResult],
rapidsJar: Seq[RapidsJarProfileResult],
sqlMetrics: Seq[SQLAccumProfileResults],
stageMetrics: Seq[AccumProfileResults],
jobAggMetrics: Seq[JobAggTaskMetricsProfileResult],
stageAggMetrics: Seq[StageAggTaskMetricsProfileResult],
sqlTaskAggMetrics: Seq[SQLTaskAggMetricsProfileResult],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
ProfSQLPlanMetricsView.getRawView(apps)
}

// Print all Stage level Metrics
def getStageLevelMetrics: Seq[AccumProfileResults] = {
ProfStageMetricView.getRawView(apps)
}

/**
* This function is meant to clean up Delta log execs so that you could align
* SQL ids between CPU and GPU eventlogs. It attempts to remove any delta log
Expand All @@ -103,6 +108,12 @@ object CollectInformation extends Logging {
}
}

def generateStageLevelAccums(apps: Seq[ApplicationInfo]): Seq[AccumProfileResults] = {
apps.flatMap { app =>
app.planMetricProcessor.generateStageLevelAccums()
}
}

def printSQLPlans(apps: Seq[ApplicationInfo], outputDir: String): Unit = {
for (app <- apps) {
val planFileWriter = new ToolTextFileWriter(s"$outputDir/${app.appId}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,23 @@ case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long,
}
}

case class AccumProfileResults(appIndex: Int, stageId: String, accumulatorId: Long, name: String,
min: Long, median: Long, max: Long, total: Long) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "stageId", "accumulatorId", "name", "min",
"median", "max", "total")

override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, stageId, accumulatorId.toString, name, min.toString, median.toString,
max.toString, total.toString)
}

override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString, StringUtils.reformatCSVString(stageId), accumulatorId.toString,
StringUtils.reformatCSVString(name), min.toString, median.toString, max.toString,
total.toString)
}
}

case class ResourceProfileInfoCase(
val resourceProfileId: Int,
val executorResources: Map[String, ExecutorResourceRequest],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val systemProps = collect.getSystemProperties
val rapidsJar = collect.getRapidsJARInfo
val sqlMetrics = collect.getSQLPlanMetrics
val stageMetrics = collect.getStageLevelMetrics
val wholeStage = collect.getWholeStageCodeGenMapping
// for compare mode we just add in extra tables for matching across applications
// the rest of the tables simply list all applications specified
Expand Down Expand Up @@ -392,7 +393,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
logInfo(s"Took ${endTime - startTime}ms to Process [${appInfo.head.appId}]")
(ApplicationSummaryInfo(appInfo, dsInfo,
collect.getExecutorInfo, collect.getJobInfo, rapidsProps,
rapidsJar, sqlMetrics, analysis.jobAggs, analysis.stageAggs,
rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs,
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
Expand Down Expand Up @@ -471,6 +472,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
combineProps("rapids", appsSum).sortBy(_.key),
appsSum.flatMap(_.rapidsJar).sortBy(_.appIndex),
appsSum.flatMap(_.sqlMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.stageMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.jobAggMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.stageAggMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.sqlTaskAggMetrics).sortBy(_.appIndex),
Expand Down Expand Up @@ -513,6 +515,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
Some(ProfRapidsJarView.getDescription))
profileOutputWriter.write(ProfSQLPlanMetricsView.getLabel, app.sqlMetrics,
Some(ProfSQLPlanMetricsView.getDescription))
profileOutputWriter.write(ProfStageMetricView.getLabel, app.stageMetrics,
Some(ProfStageMetricView.getDescription))
profileOutputWriter.write(ProfSQLCodeGenView.getLabel, app.wholeStage,
Some(ProfSQLCodeGenView.getDescription))
comparedRes.foreach { compareSum =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.views

import com.nvidia.spark.rapids.tool.analysis.{AppSQLPlanAnalyzer, ProfAppIndexMapperTrait}
import com.nvidia.spark.rapids.tool.profiling.AccumProfileResults

import org.apache.spark.sql.rapids.tool.AppBase
import org.apache.spark.sql.rapids.tool.annotation.Since
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

trait AppStageMetricsViewTrait extends ViewableTrait[AccumProfileResults] {
override def getLabel: String = "Stage Level All Metrics"
override def getDescription: String = "Stage Level Metrics"

override def sortView(
rows: Seq[AccumProfileResults]): Seq[AccumProfileResults] = {
rows.sortBy(cols => (cols.stageId, cols.appIndex, cols.accumulatorId))
}
}

@Since("24.06.2")
object ProfStageMetricView extends AppStageMetricsViewTrait with ProfAppIndexMapperTrait {

override def getRawView(app: AppBase, index: Int): Seq[AccumProfileResults] = {
app match {
case app: ApplicationInfo =>
app.planMetricProcessor.generateStageLevelAccums()
case _ => Seq.empty
}
}
}

object QualStageMetricView extends AppStageMetricsViewTrait with ProfAppIndexMapperTrait {
// Keep for the following refactor to customize the view based on the app type (Qual/Prof)
override def getRawView(app: AppBase, index: Int): Seq[AccumProfileResults] = {
Seq.empty
}

def getRawViewFromSqlProcessor(
sqlAnalyzer: AppSQLPlanAnalyzer): Seq[AccumProfileResults] = {
sortView(sqlAnalyzer.generateStageLevelAccums())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ object QualRawReportGenerator {
pWriter.write(QualExecutorView.getLabel, QualExecutorView.getRawView(Seq(app)))
pWriter.write(QualAppJobView.getLabel, QualAppJobView.getRawView(Seq(app)))
generateSQLProcessingView(pWriter, sqlPlanAnalyzer)
pWriter.write(QualStageMetricView.getLabel,
QualStageMetricView.getRawViewFromSqlProcessor(sqlPlanAnalyzer),
Some(QualStageMetricView.getDescription))
pWriter.write(RapidsQualPropertiesView.getLabel,
RapidsQualPropertiesView.getRawView(Seq(app)),
Some(RapidsQualPropertiesView.getDescription))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids.tool.profiling

import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths, StandardOpenOption}

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -202,6 +203,50 @@ class ApplicationInfoSuite extends FunSuite with Logging {
ToolTestUtils.compareDataFrames(df, dfExpect)
}

test("test GpuMetrics in eventlog") {
TrampolineUtil.withTempDir { outputDir =>
TrampolineUtil.withTempDir { tmpEventLogDir =>
val eventLogFilePath = Paths.get(tmpEventLogDir.getAbsolutePath, "gpu_metrics_eventlog")
// scalastyle:off line.size.limit
val eventLogContent =
"""{"Event":"SparkListenerLogStart","Spark Version":"3.2.1"}
|{"Event":"SparkListenerApplicationStart","App Name":"GPUMetrics", "App ID":"local-16261043003", "Timestamp":123456, "User":"User1"}
|{"Event":"SparkListenerTaskEnd","Stage ID":10,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5073,"Index":5054,"Attempt":0,"Partition ID":5054,"Launch Time":1712248533994,"Executor ID":"100","Host":"10.154.65.143","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1712253284920,"Failed":false,"Killed":false,"Accumulables":[{"ID":1010,"Name":"gpuSemaphoreWait","Update":"00:00:00.492","Value":"03:13:31.359","Internal":false,"Count Failed Values":true},{"ID":1018,"Name":"gpuSpillToHostTime","Update":"00:00:00.845","Value":"00:29:39.521","Internal":false,"Count Failed Values":true},{"ID":1016,"Name":"gpuSplitAndRetryCount","Update":"1","Value":"2","Internal":false,"Count Failed Values":true}]}}""".stripMargin
// scalastyle:on line.size.limit
Files.write(eventLogFilePath, eventLogContent.getBytes(StandardCharsets.UTF_8))

val profileArgs = Array(
"--output-directory", outputDir.getAbsolutePath,
eventLogFilePath.toString
)

val appArgs = new ProfileArgs(profileArgs)
val (exit, _) = ProfileMain.mainInternal(appArgs)
assert(exit == 0)

val apps = ArrayBuffer[ApplicationInfo]()
var index = 1

val eventLogPaths = appArgs.eventlog()
eventLogPaths.foreach { path =>
val eventLogInfo = EventLogPathProcessor.getEventLogInfo(path, hadoopConf).head._1
apps += new ApplicationInfo(hadoopConf, eventLogInfo, index)
index += 1
}
assert(apps.size == 1)

val collect = new CollectInformation(apps)
val gpuMetrics = collect.getStageLevelMetrics

// Sample eventlog has 3 gpu metrics, gpuSemaphoreWait,
// gpuSpillToHostTime, gpuSplitAndRetryCount
assert(gpuMetrics.size == 3)
val gpuSemaphoreWait = gpuMetrics.find(_.name == "gpuSemaphoreWait")
assert(gpuSemaphoreWait.isDefined)
}
}
}

test("test printSQLPlans") {
TrampolineUtil.withTempDir { tempOutputDir =>
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
Expand Down Expand Up @@ -790,7 +835,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
f.endsWith(".csv")
})
// compare the number of files generated
assert(dotDirs.length === 19)
assert(dotDirs.length === 20)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down Expand Up @@ -824,7 +869,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
f.endsWith(".csv")
})
// compare the number of files generated
assert(dotDirs.length === 15)
assert(dotDirs.length === 16)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down Expand Up @@ -861,7 +906,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
f.endsWith(".csv")
})
// compare the number of files generated
assert(dotDirs.length === 19)
assert(dotDirs.length === 20)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down Expand Up @@ -898,7 +943,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
f.endsWith(".csv")
})
// compare the number of files generated
assert(dotDirs.length === 17)
assert(dotDirs.length === 18)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down

0 comments on commit 91fed9d

Please sign in to comment.