diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala index 29b9ece3d..7ffd01872 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids.tool.analysis import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet} import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser -import com.nvidia.spark.rapids.tool.profiling.{DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} +import com.nvidia.spark.rapids.tool.profiling.{AccumProfileResults, DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer import org.apache.spark.sql.execution.SparkPlanInfo @@ -342,6 +342,58 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap } } } + + /** + * Generate the stage level metrics for the SQL plan including GPU metrics if applicable. + * Along with Spark defined metrics, below is the list of GPU metrics that are collected if they + * are present in the eventlog: + * gpuSemaphoreWait, gpuRetryCount, gpuSplitAndRetryCount, gpuRetryBlockTime, + * gpuRetryComputationTime, gpuSpillToHostTime, gpuSpillToDiskTime, gpuReadSpillFromHostTime, + * gpuReadSpillFromDiskTime + * + * @return a sequence of AccumProfileResults + */ + def generateStageLevelAccums(): Seq[AccumProfileResults] = { + + def computeStatistics(updates: Seq[Long]): Option[StatisticsMetrics] = { + // drop the metrics if there are no values + if (updates.isEmpty) { + None + } else if (updates.length == 1) { + Some(StatisticsMetrics(0L, 0L, 0L, updates.sum)) + } else { + Some(StatisticsMetrics( + min = updates.head, + med = updates(updates.size / 2), + max = updates.last, + total = updates.sum + )) + } + } + + // Process taskStageAccumMap to get all the accumulators + val stageLevelAccums = app.taskStageAccumMap.values.flatten + val groupedByAccumulatorId = stageLevelAccums.groupBy(_.accumulatorId) + groupedByAccumulatorId.flatMap { case (accumulatorId, accums) => + // Extract and sort the update values, defaulting to 0 if not present + val sortedUpdates = accums.flatMap(_.update).toSeq.sorted + + // Compute the statistics for the accumulator if applicable + computeStatistics(sortedUpdates).map { stats => + val sampleAccum = accums.head + AccumProfileResults( + appIndex = appIndex, + stageId = sampleAccum.stageId.toString, + accumulatorId = accumulatorId, + name = sampleAccum.name.getOrElse("Unknown"), + min = stats.min, + median = stats.med, + max = stats.max, + total = stats.total + ) + } + }.toSeq + } } object AppSQLPlanAnalyzer { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala index a5692c261..6a5b197db 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala @@ -26,6 +26,7 @@ case class ApplicationSummaryInfo( rapidsProps: Seq[RapidsPropertyProfileResult], rapidsJar: Seq[RapidsJarProfileResult], sqlMetrics: Seq[SQLAccumProfileResults], + stageMetrics: Seq[AccumProfileResults], jobAggMetrics: Seq[JobAggTaskMetricsProfileResult], stageAggMetrics: Seq[StageAggTaskMetricsProfileResult], sqlTaskAggMetrics: Seq[SQLTaskAggMetricsProfileResult], diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala index 44d566236..ba2c45dbf 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala @@ -84,6 +84,11 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { ProfSQLPlanMetricsView.getRawView(apps) } + // Print all Stage level Metrics + def getStageLevelMetrics: Seq[AccumProfileResults] = { + ProfStageMetricView.getRawView(apps) + } + /** * This function is meant to clean up Delta log execs so that you could align * SQL ids between CPU and GPU eventlogs. It attempts to remove any delta log @@ -103,6 +108,12 @@ object CollectInformation extends Logging { } } + def generateStageLevelAccums(apps: Seq[ApplicationInfo]): Seq[AccumProfileResults] = { + apps.flatMap { app => + app.planMetricProcessor.generateStageLevelAccums() + } + } + def printSQLPlans(apps: Seq[ApplicationInfo], outputDir: String): Unit = { for (app <- apps) { val planFileWriter = new ToolTextFileWriter(s"$outputDir/${app.appId}", diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala index e6d83c27e..d6e369ef8 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala @@ -219,6 +219,23 @@ case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long, } } +case class AccumProfileResults(appIndex: Int, stageId: String, accumulatorId: Long, name: String, + min: Long, median: Long, max: Long, total: Long) extends ProfileResult { + override val outputHeaders = Seq("appIndex", "stageId", "accumulatorId", "name", "min", + "median", "max", "total") + + override def convertToSeq: Seq[String] = { + Seq(appIndex.toString, stageId, accumulatorId.toString, name, min.toString, median.toString, + max.toString, total.toString) + } + + override def convertToCSVSeq: Seq[String] = { + Seq(appIndex.toString, StringUtils.reformatCSVString(stageId), accumulatorId.toString, + StringUtils.reformatCSVString(name), min.toString, median.toString, max.toString, + total.toString) + } +} + case class ResourceProfileInfoCase( val resourceProfileId: Int, val executorResources: Map[String, ExecutorResourceRequest], diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index dd8aa5470..c2eaa6afe 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -329,6 +329,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea val systemProps = collect.getSystemProperties val rapidsJar = collect.getRapidsJARInfo val sqlMetrics = collect.getSQLPlanMetrics + val stageMetrics = collect.getStageLevelMetrics val wholeStage = collect.getWholeStageCodeGenMapping // for compare mode we just add in extra tables for matching across applications // the rest of the tables simply list all applications specified @@ -392,7 +393,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea logInfo(s"Took ${endTime - startTime}ms to Process [${appInfo.head.appId}]") (ApplicationSummaryInfo(appInfo, dsInfo, collect.getExecutorInfo, collect.getJobInfo, rapidsProps, - rapidsJar, sqlMetrics, analysis.jobAggs, analysis.stageAggs, + rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs, analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew, failedTasks, failedStages, failedJobs, removedBMs, removedExecutors, unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo, @@ -471,6 +472,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea combineProps("rapids", appsSum).sortBy(_.key), appsSum.flatMap(_.rapidsJar).sortBy(_.appIndex), appsSum.flatMap(_.sqlMetrics).sortBy(_.appIndex), + appsSum.flatMap(_.stageMetrics).sortBy(_.appIndex), appsSum.flatMap(_.jobAggMetrics).sortBy(_.appIndex), appsSum.flatMap(_.stageAggMetrics).sortBy(_.appIndex), appsSum.flatMap(_.sqlTaskAggMetrics).sortBy(_.appIndex), @@ -513,6 +515,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea Some(ProfRapidsJarView.getDescription)) profileOutputWriter.write(ProfSQLPlanMetricsView.getLabel, app.sqlMetrics, Some(ProfSQLPlanMetricsView.getDescription)) + profileOutputWriter.write(ProfStageMetricView.getLabel, app.stageMetrics, + Some(ProfStageMetricView.getDescription)) profileOutputWriter.write(ProfSQLCodeGenView.getLabel, app.wholeStage, Some(ProfSQLCodeGenView.getDescription)) comparedRes.foreach { compareSum => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/AppStageMetricsView.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/AppStageMetricsView.scala new file mode 100644 index 000000000..e343ee7ea --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/AppStageMetricsView.scala @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.views + +import com.nvidia.spark.rapids.tool.analysis.{AppSQLPlanAnalyzer, ProfAppIndexMapperTrait} +import com.nvidia.spark.rapids.tool.profiling.AccumProfileResults + +import org.apache.spark.sql.rapids.tool.AppBase +import org.apache.spark.sql.rapids.tool.annotation.Since +import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo + +trait AppStageMetricsViewTrait extends ViewableTrait[AccumProfileResults] { + override def getLabel: String = "Stage Level All Metrics" + override def getDescription: String = "Stage Level Metrics" + + override def sortView( + rows: Seq[AccumProfileResults]): Seq[AccumProfileResults] = { + rows.sortBy(cols => (cols.stageId, cols.appIndex, cols.accumulatorId)) + } +} + +@Since("24.06.2") +object ProfStageMetricView extends AppStageMetricsViewTrait with ProfAppIndexMapperTrait { + + override def getRawView(app: AppBase, index: Int): Seq[AccumProfileResults] = { + app match { + case app: ApplicationInfo => + app.planMetricProcessor.generateStageLevelAccums() + case _ => Seq.empty + } + } +} + +object QualStageMetricView extends AppStageMetricsViewTrait with ProfAppIndexMapperTrait { + // Keep for the following refactor to customize the view based on the app type (Qual/Prof) + override def getRawView(app: AppBase, index: Int): Seq[AccumProfileResults] = { + Seq.empty + } + + def getRawViewFromSqlProcessor( + sqlAnalyzer: AppSQLPlanAnalyzer): Seq[AccumProfileResults] = { + sortView(sqlAnalyzer.generateStageLevelAccums()) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala index 61c78c470..27455d9e9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala @@ -75,6 +75,9 @@ object QualRawReportGenerator { pWriter.write(QualExecutorView.getLabel, QualExecutorView.getRawView(Seq(app))) pWriter.write(QualAppJobView.getLabel, QualAppJobView.getRawView(Seq(app))) generateSQLProcessingView(pWriter, sqlPlanAnalyzer) + pWriter.write(QualStageMetricView.getLabel, + QualStageMetricView.getRawViewFromSqlProcessor(sqlPlanAnalyzer), + Some(QualStageMetricView.getDescription)) pWriter.write(RapidsQualPropertiesView.getLabel, RapidsQualPropertiesView.getRawView(Seq(app)), Some(RapidsQualPropertiesView.getDescription)) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index 92d0f520b..7fd345e64 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids.tool.profiling import java.io.File +import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths, StandardOpenOption} import scala.collection.mutable.ArrayBuffer @@ -202,6 +203,50 @@ class ApplicationInfoSuite extends FunSuite with Logging { ToolTestUtils.compareDataFrames(df, dfExpect) } + test("test GpuMetrics in eventlog") { + TrampolineUtil.withTempDir { outputDir => + TrampolineUtil.withTempDir { tmpEventLogDir => + val eventLogFilePath = Paths.get(tmpEventLogDir.getAbsolutePath, "gpu_metrics_eventlog") + // scalastyle:off line.size.limit + val eventLogContent = + """{"Event":"SparkListenerLogStart","Spark Version":"3.2.1"} + |{"Event":"SparkListenerApplicationStart","App Name":"GPUMetrics", "App ID":"local-16261043003", "Timestamp":123456, "User":"User1"} + |{"Event":"SparkListenerTaskEnd","Stage ID":10,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5073,"Index":5054,"Attempt":0,"Partition ID":5054,"Launch Time":1712248533994,"Executor ID":"100","Host":"10.154.65.143","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1712253284920,"Failed":false,"Killed":false,"Accumulables":[{"ID":1010,"Name":"gpuSemaphoreWait","Update":"00:00:00.492","Value":"03:13:31.359","Internal":false,"Count Failed Values":true},{"ID":1018,"Name":"gpuSpillToHostTime","Update":"00:00:00.845","Value":"00:29:39.521","Internal":false,"Count Failed Values":true},{"ID":1016,"Name":"gpuSplitAndRetryCount","Update":"1","Value":"2","Internal":false,"Count Failed Values":true}]}}""".stripMargin + // scalastyle:on line.size.limit + Files.write(eventLogFilePath, eventLogContent.getBytes(StandardCharsets.UTF_8)) + + val profileArgs = Array( + "--output-directory", outputDir.getAbsolutePath, + eventLogFilePath.toString + ) + + val appArgs = new ProfileArgs(profileArgs) + val (exit, _) = ProfileMain.mainInternal(appArgs) + assert(exit == 0) + + val apps = ArrayBuffer[ApplicationInfo]() + var index = 1 + + val eventLogPaths = appArgs.eventlog() + eventLogPaths.foreach { path => + val eventLogInfo = EventLogPathProcessor.getEventLogInfo(path, hadoopConf).head._1 + apps += new ApplicationInfo(hadoopConf, eventLogInfo, index) + index += 1 + } + assert(apps.size == 1) + + val collect = new CollectInformation(apps) + val gpuMetrics = collect.getStageLevelMetrics + + // Sample eventlog has 3 gpu metrics, gpuSemaphoreWait, + // gpuSpillToHostTime, gpuSplitAndRetryCount + assert(gpuMetrics.size == 3) + val gpuSemaphoreWait = gpuMetrics.find(_.name == "gpuSemaphoreWait") + assert(gpuSemaphoreWait.isDefined) + } + } + } + test("test printSQLPlans") { TrampolineUtil.withTempDir { tempOutputDir => val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() @@ -790,7 +835,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 19) + assert(dotDirs.length === 20) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -824,7 +869,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 15) + assert(dotDirs.length === 16) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -861,7 +906,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 19) + assert(dotDirs.length === 20) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly @@ -898,7 +943,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { f.endsWith(".csv") }) // compare the number of files generated - assert(dotDirs.length === 17) + assert(dotDirs.length === 18) for (file <- dotDirs) { assert(file.getAbsolutePath.endsWith(".csv")) // just load each one to make sure formatted properly