Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add all stage metrics to tools output #1151

Merged
merged 8 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.analysis
import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet}

import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser
import com.nvidia.spark.rapids.tool.profiling.{DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults}
import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer

import org.apache.spark.sql.execution.SparkPlanInfo
Expand Down Expand Up @@ -342,6 +342,58 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
}
}
}

/**
* Generate the stage level metrics for the SQL plan including GPU metrics if applicable.
* Along with Spark defined metrics, below is the list of GPU metrics that are collected if they
* are present in the eventlog:
* gpuSemaphoreWait, gpuRetryCount, gpuSplitAndRetryCount, gpuRetryBlockTime,
* gpuRetryComputationTime, gpuSpillToHostTime, gpuSpillToDiskTime, gpuReadSpillFromHostTime,
* gpuReadSpillFromDiskTime
*
* @return a sequence of AccumProfileResults
*/
def generateStageLevelAccums(): Seq[AccumProfileResults] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is actually a stageLevel thing but it is defined in the AppSQLPlanAnalyzer which does not look like a good fit.
I am fine to keep it here for now.


def computeStatistics(updates: Seq[Long]): Option[StatisticsMetrics] = {
// drop the metrics if there are no values
if (updates.isEmpty) {
None
} else if (updates.length == 1) {
Some(StatisticsMetrics(0L, 0L, 0L, updates.sum))
} else {
Some(StatisticsMetrics(
min = updates.head,
med = updates(updates.size / 2),
max = updates.last,
total = updates.sum
))
}
}

// Process taskStageAccumMap to get all the accumulators
val stageLevelAccums = app.taskStageAccumMap.values.flatten
val groupedByAccumulatorId = stageLevelAccums.groupBy(_.accumulatorId)
groupedByAccumulatorId.flatMap { case (accumulatorId, accums) =>
// Extract and sort the update values, defaulting to 0 if not present
val sortedUpdates = accums.flatMap(_.update).toSeq.sorted
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to sort here? If we sort the final result in AppGpuMetricsViewTrait, then we are double sorting?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to sort the list to get the min, median and max for a given accumulator. The other sort is for the View.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.
It looks like a code efficiency problem. Picking min, max, med, and then sum seems to be done in several iterations than necessary.
Anyway, this is not part of this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Compute the statistics for the accumulator if applicable
computeStatistics(sortedUpdates).map { stats =>
val sampleAccum = accums.head
AccumProfileResults(
appIndex = appIndex,
stageId = sampleAccum.stageId.toString,
accumulatorId = accumulatorId,
name = sampleAccum.name.getOrElse("Unknown"),
min = stats.min,
median = stats.med,
max = stats.max,
total = stats.total
)
}
}.toSeq
}
}

object AppSQLPlanAnalyzer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ case class ApplicationSummaryInfo(
rapidsProps: Seq[RapidsPropertyProfileResult],
rapidsJar: Seq[RapidsJarProfileResult],
sqlMetrics: Seq[SQLAccumProfileResults],
stageMetrics: Seq[AccumProfileResults],
jobAggMetrics: Seq[JobAggTaskMetricsProfileResult],
stageAggMetrics: Seq[StageAggTaskMetricsProfileResult],
sqlTaskAggMetrics: Seq[SQLTaskAggMetricsProfileResult],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
ProfSQLPlanMetricsView.getRawView(apps)
}

// Print all Stage level Metrics
def getStageLevelMetrics: Seq[AccumProfileResults] = {
ProfStageMetricView.getRawView(apps)
}

/**
* This function is meant to clean up Delta log execs so that you could align
* SQL ids between CPU and GPU eventlogs. It attempts to remove any delta log
Expand All @@ -103,6 +108,12 @@ object CollectInformation extends Logging {
}
}

def generateStageLevelAccums(apps: Seq[ApplicationInfo]): Seq[AccumProfileResults] = {
apps.flatMap { app =>
app.planMetricProcessor.generateStageLevelAccums()
}
}

def printSQLPlans(apps: Seq[ApplicationInfo], outputDir: String): Unit = {
for (app <- apps) {
val planFileWriter = new ToolTextFileWriter(s"$outputDir/${app.appId}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,23 @@ case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long,
}
}

case class AccumProfileResults(appIndex: Int, stageId: String, accumulatorId: Long, name: String,
min: Long, median: Long, max: Long, total: Long) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "stageId", "accumulatorId", "name", "min",
"median", "max", "total")

override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, stageId, accumulatorId.toString, name, min.toString, median.toString,
max.toString, total.toString)
}

override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString, StringUtils.reformatCSVString(stageId), accumulatorId.toString,
StringUtils.reformatCSVString(name), min.toString, median.toString, max.toString,
total.toString)
}
}

case class ResourceProfileInfoCase(
val resourceProfileId: Int,
val executorResources: Map[String, ExecutorResourceRequest],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val systemProps = collect.getSystemProperties
val rapidsJar = collect.getRapidsJARInfo
val sqlMetrics = collect.getSQLPlanMetrics
val stageMetrics = collect.getStageLevelMetrics
val wholeStage = collect.getWholeStageCodeGenMapping
// for compare mode we just add in extra tables for matching across applications
// the rest of the tables simply list all applications specified
Expand Down Expand Up @@ -392,7 +393,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
logInfo(s"Took ${endTime - startTime}ms to Process [${appInfo.head.appId}]")
(ApplicationSummaryInfo(appInfo, dsInfo,
collect.getExecutorInfo, collect.getJobInfo, rapidsProps,
rapidsJar, sqlMetrics, analysis.jobAggs, analysis.stageAggs,
rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs,
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
Expand Down Expand Up @@ -471,6 +472,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
combineProps("rapids", appsSum).sortBy(_.key),
appsSum.flatMap(_.rapidsJar).sortBy(_.appIndex),
appsSum.flatMap(_.sqlMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.stageMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.jobAggMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.stageAggMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.sqlTaskAggMetrics).sortBy(_.appIndex),
Expand Down Expand Up @@ -513,6 +515,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
Some(ProfRapidsJarView.getDescription))
profileOutputWriter.write(ProfSQLPlanMetricsView.getLabel, app.sqlMetrics,
Some(ProfSQLPlanMetricsView.getDescription))
profileOutputWriter.write(ProfStageMetricView.getLabel, app.stageMetrics,
Some(ProfStageMetricView.getDescription))
profileOutputWriter.write(ProfSQLCodeGenView.getLabel, app.wholeStage,
Some(ProfSQLCodeGenView.getDescription))
comparedRes.foreach { compareSum =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.views

import com.nvidia.spark.rapids.tool.analysis.{AppSQLPlanAnalyzer, ProfAppIndexMapperTrait}
import com.nvidia.spark.rapids.tool.profiling.AccumProfileResults

import org.apache.spark.sql.rapids.tool.AppBase
import org.apache.spark.sql.rapids.tool.annotation.Since
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

trait AppStageMetricsViewTrait extends ViewableTrait[AccumProfileResults] {
override def getLabel: String = "Stage Level All Metrics"
override def getDescription: String = "Stage Level Metrics"

override def sortView(
rows: Seq[AccumProfileResults]): Seq[AccumProfileResults] = {
rows.sortBy(cols => (cols.stageId, cols.appIndex, cols.accumulatorId))
}
}

@Since("24.06.2")
object ProfStageMetricView extends AppStageMetricsViewTrait with ProfAppIndexMapperTrait {

override def getRawView(app: AppBase, index: Int): Seq[AccumProfileResults] = {
app match {
case app: ApplicationInfo =>
app.planMetricProcessor.generateStageLevelAccums()
case _ => Seq.empty
}
}
}

object QualStageMetricView extends AppStageMetricsViewTrait with ProfAppIndexMapperTrait {
// Keep for the following refactor to customize the view based on the app type (Qual/Prof)
override def getRawView(app: AppBase, index: Int): Seq[AccumProfileResults] = {
Seq.empty
}

def getRawViewFromSqlProcessor(
sqlAnalyzer: AppSQLPlanAnalyzer): Seq[AccumProfileResults] = {
sortView(sqlAnalyzer.generateStageLevelAccums())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ object QualRawReportGenerator {
pWriter.write(QualExecutorView.getLabel, QualExecutorView.getRawView(Seq(app)))
pWriter.write(QualAppJobView.getLabel, QualAppJobView.getRawView(Seq(app)))
generateSQLProcessingView(pWriter, sqlPlanAnalyzer)
pWriter.write(QualStageMetricView.getLabel,
QualStageMetricView.getRawViewFromSqlProcessor(sqlPlanAnalyzer),
Some(QualStageMetricView.getDescription))
pWriter.write(RapidsQualPropertiesView.getLabel,
RapidsQualPropertiesView.getRawView(Seq(app)),
Some(RapidsQualPropertiesView.getDescription))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids.tool.profiling

import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths, StandardOpenOption}

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -202,6 +203,50 @@ class ApplicationInfoSuite extends FunSuite with Logging {
ToolTestUtils.compareDataFrames(df, dfExpect)
}

test("test GpuMetrics in eventlog") {
TrampolineUtil.withTempDir { outputDir =>
TrampolineUtil.withTempDir { tmpEventLogDir =>
val eventLogFilePath = Paths.get(tmpEventLogDir.getAbsolutePath, "gpu_metrics_eventlog")
// scalastyle:off line.size.limit
val eventLogContent =
amahussein marked this conversation as resolved.
Show resolved Hide resolved
"""{"Event":"SparkListenerLogStart","Spark Version":"3.2.1"}
|{"Event":"SparkListenerApplicationStart","App Name":"GPUMetrics", "App ID":"local-16261043003", "Timestamp":123456, "User":"User1"}
|{"Event":"SparkListenerTaskEnd","Stage ID":10,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5073,"Index":5054,"Attempt":0,"Partition ID":5054,"Launch Time":1712248533994,"Executor ID":"100","Host":"10.154.65.143","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1712253284920,"Failed":false,"Killed":false,"Accumulables":[{"ID":1010,"Name":"gpuSemaphoreWait","Update":"00:00:00.492","Value":"03:13:31.359","Internal":false,"Count Failed Values":true},{"ID":1018,"Name":"gpuSpillToHostTime","Update":"00:00:00.845","Value":"00:29:39.521","Internal":false,"Count Failed Values":true},{"ID":1016,"Name":"gpuSplitAndRetryCount","Update":"1","Value":"2","Internal":false,"Count Failed Values":true}]}}""".stripMargin
// scalastyle:on line.size.limit
Files.write(eventLogFilePath, eventLogContent.getBytes(StandardCharsets.UTF_8))

val profileArgs = Array(
"--output-directory", outputDir.getAbsolutePath,
eventLogFilePath.toString
)

val appArgs = new ProfileArgs(profileArgs)
val (exit, _) = ProfileMain.mainInternal(appArgs)
assert(exit == 0)

val apps = ArrayBuffer[ApplicationInfo]()
var index = 1

val eventLogPaths = appArgs.eventlog()
eventLogPaths.foreach { path =>
val eventLogInfo = EventLogPathProcessor.getEventLogInfo(path, hadoopConf).head._1
apps += new ApplicationInfo(hadoopConf, eventLogInfo, index)
index += 1
}
assert(apps.size == 1)

val collect = new CollectInformation(apps)
val gpuMetrics = collect.getStageLevelMetrics

// Sample eventlog has 3 gpu metrics, gpuSemaphoreWait,
// gpuSpillToHostTime, gpuSplitAndRetryCount
assert(gpuMetrics.size == 3)
val gpuSemaphoreWait = gpuMetrics.find(_.name == "gpuSemaphoreWait")
assert(gpuSemaphoreWait.isDefined)
}
}
}

test("test printSQLPlans") {
TrampolineUtil.withTempDir { tempOutputDir =>
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
Expand Down Expand Up @@ -790,7 +835,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
f.endsWith(".csv")
})
// compare the number of files generated
assert(dotDirs.length === 19)
assert(dotDirs.length === 20)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down Expand Up @@ -824,7 +869,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
f.endsWith(".csv")
})
// compare the number of files generated
assert(dotDirs.length === 15)
assert(dotDirs.length === 16)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down Expand Up @@ -861,7 +906,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
f.endsWith(".csv")
})
// compare the number of files generated
assert(dotDirs.length === 19)
assert(dotDirs.length === 20)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down Expand Up @@ -898,7 +943,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
f.endsWith(".csv")
})
// compare the number of files generated
assert(dotDirs.length === 17)
assert(dotDirs.length === 18)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down