Skip to content

Commit

Permalink
Add code for generating dot file visualizations (NVIDIA#2449)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored May 26, 2021
1 parent 6082086 commit ed10c45
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 10 deletions.
2 changes: 1 addition & 1 deletion rapids-4-spark-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.sql.rapids.tool.profiling.ProfileMain</mainClass>
<mainClass>com.nvidia.spark.rapids.tool.profiling.ProfileMain</mainClass>
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
*/
package com.nvidia.spark.rapids.tool.profiling

import java.io.File
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo


/**
* CollectInformation mainly print information based on this event log:
* Such as executors, parameters, etc.
Expand Down Expand Up @@ -72,4 +78,39 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) {
messageHeader = messageHeader)
}
}

def generateDot(): Unit = {
for (app <- apps) {
val requiredDataFrames = Seq("sqlMetricsDF", "driverAccumDF",
"taskStageAccumDF", "taskStageAccumDF")
.map(name => s"${name}_${app.index}")
if (requiredDataFrames.forall(app.allDataFrames.contains)) {
val accums = app.runQuery(app.generateSQLAccums)
val start = System.nanoTime()
val accumSummary = accums
.select(col("sqlId"), col("accumulatorId"), col("max_value"))
.collect()
val map = new mutable.HashMap[Long, ArrayBuffer[(Long,Long)]]()
for (row <- accumSummary) {
val list = map.getOrElseUpdate(row.getLong(0), new ArrayBuffer[(Long, Long)]())
list += row.getLong(1) -> row.getLong(2)
}
val outDir = new File(app.args.outputDirectory())
for ((sqlID, planInfo) <- app.sqlPlan) {
val fileDir = new File(outDir, s"${app.appId}-query-$sqlID")
fileDir.mkdirs()
val metrics = map.getOrElse(sqlID, Seq.empty).toMap
GenerateDot.generateDotGraph(
QueryPlanWithMetrics(planInfo, metrics), None, fileDir, sqlID + ".dot")
}
val duration = TimeUnit.SECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)
fileWriter.write(s"Generated DOT graphs for app ${app.appId} " +
s"to ${outDir.getAbsolutePath} in $duration second(s)\n")
} else {
val missingDataFrames = requiredDataFrames.filterNot(app.allDataFrames.contains)
fileWriter.write(s"Could not generate DOT graph for app ${app.appId} " +
s"because of missing data frames: ${missingDataFrames.mkString(", ")}\n")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.tool.profiling

import java.io.{File, FileWriter}
import java.util.concurrent.TimeUnit

import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.metric.SQLMetricInfo

/**
* Generate a DOT graph for one query plan, or showing differences between two query plans.
*
* Diff mode is intended for comparing query plans that are expected to have the same
* structure, such as two different runs of the same query but with different tuning options.
*
* When running in diff mode, any differences in SQL metrics are shown. Also, if the plan
* starts to deviate then the graph will show where the plans deviate and will not recurse
* further.
*
* Graphviz and other tools can be used to generate images from DOT files.
*
* See https://graphviz.org/pdf/dotguide.pdf for a description of DOT files.
*/
object GenerateDot {
private val GPU_COLOR = "#76b900" // NVIDIA Green
private val CPU_COLOR = "#0071c5"
private val TRANSITION_COLOR = "red"

/**
* Generate a query plan visualization in dot format.
*
* @param plan First query plan and metrics
* @param comparisonPlan Optional second query plan and metrics
* @param filename Filename to write dot graph to
*/
def generateDotGraph(
plan: QueryPlanWithMetrics,
comparisonPlan: Option[QueryPlanWithMetrics],
dir: File,
filename: String): Unit = {

var nextId = 1

def isGpuPlan(plan: SparkPlanInfo): Boolean = {
plan.nodeName match {
case name if name contains "QueryStage" =>
plan.children.isEmpty || isGpuPlan(plan.children.head)
case name if name == "ReusedExchange" =>
plan.children.isEmpty || isGpuPlan(plan.children.head)
case name =>
name.startsWith("Gpu")
}
}

def formatMetric(m: SQLMetricInfo, value: Long): String = {
val formatter = java.text.NumberFormat.getIntegerInstance
m.metricType match {
case "timing" =>
val ms = value
s"${formatter.format(ms)} ms"
case "nsTiming" =>
val ms = TimeUnit.NANOSECONDS.toMillis(value)
s"${formatter.format(ms)} ms"
case _ =>
s"${formatter.format(value)}"
}
}

/** Recursively graph the operator nodes in the spark plan */
def writeGraph(
w: FileWriter,
node: QueryPlanWithMetrics,
comparisonNode: QueryPlanWithMetrics,
id: Int = 0): Unit = {

val nodePlan = node.plan
val comparisonPlan = comparisonNode.plan
if (nodePlan.nodeName == comparisonPlan.nodeName &&
nodePlan.children.length == comparisonPlan.children.length) {

val metricNames = (nodePlan.metrics.map(_.name) ++
comparisonPlan.metrics.map(_.name)).distinct.sorted

val metrics = metricNames.flatMap(name => {
val l = nodePlan.metrics.find(_.name == name)
val r = comparisonPlan.metrics.find(_.name == name)
(l, r) match {
case (Some(metric1), Some(metric2)) =>
(node.metrics.get(metric1.accumulatorId),
comparisonNode.metrics.get(metric1.accumulatorId)) match {
case (Some(value1), Some(value2)) =>
if (value1 == value2) {
Some(s"$name: ${formatMetric(metric1, value1)}")
} else {
metric1.metricType match {
case "nsTiming" | "timing" =>
val pctStr = createPercentDiffString(value1, value2)
Some(s"$name: ${formatMetric(metric1, value1)} / " +
s"${formatMetric(metric2, value2)} ($pctStr %)")
case _ =>
Some(s"$name: ${formatMetric(metric1, value1)} / " +
s"${formatMetric(metric2, value2)}")
}
}
case _ => None
}
case _ => None
}
}).mkString("\n")

val color = if (isGpuPlan(nodePlan)) { GPU_COLOR } else { CPU_COLOR }

val label = if (nodePlan.nodeName.contains("QueryStage")) {
nodePlan.simpleString
} else {
nodePlan.nodeName
}

val nodeText =
s"""node$id [shape=box,color="$color",style="filled",
|label = "$label\n
|$metrics"];
|""".stripMargin

w.write(nodeText)
nodePlan.children.indices.foreach(i => {
val childId = nextId
nextId += 1
writeGraph(
w,
QueryPlanWithMetrics(nodePlan.children(i), node.metrics),
QueryPlanWithMetrics(comparisonPlan.children(i), comparisonNode.metrics),
childId);

val style = (isGpuPlan(nodePlan), isGpuPlan(nodePlan.children(i))) match {
case (true, true) => s"""color="$GPU_COLOR""""
case (false, false) => s"""color="$CPU_COLOR""""
case _ =>
// show emphasis on transitions between CPU and GPU
s"color=$TRANSITION_COLOR, style=bold"
}
w.write(s"node$childId -> node$id [$style];\n")
})
} else {
// plans have diverged - cannot recurse further
w.write(
s"""node$id [shape=box, color=red,
|label = "plans diverge here:
|${nodePlan.nodeName} vs ${comparisonPlan.nodeName}"];\n""".stripMargin)
}
}

// write the dot graph to a file
val file = new File(dir, filename)
val w = new FileWriter(file)
try {
w.write("digraph G {\n")
writeGraph(w, plan, comparisonPlan.getOrElse(plan), 0)
w.write("}\n")
} finally {
w.close()
}
}

private def createPercentDiffString(n1: Long, n2: Long) = {
val pct = (n2 - n1) * 100.0 / n1
val pctStr = if (pct < 0) {
f"$pct%.1f"
} else {
f"+$pct%.1f"
}
pctStr
}
}

/**
* Query plan with metrics.
*
* @param plan Query plan.
* @param metrics Map of accumulatorId to metric.
*/
case class QueryPlanWithMetrics(plan: SparkPlanInfo, metrics: Map[Long, Long])
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,9 @@ For usage see below:
val numOutputRows: ScallopOption[Int] =
opt[Int](required = false,
descr = "Number of output rows for each Application. Default is 1000")
val generateDot: ScallopOption[Boolean] =
opt[Boolean](required = false,
descr = "Generate query visualizations in DOT format. Default is false")

verify()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,42 @@ package com.nvidia.spark.rapids.tool.profiling

import java.io.FileWriter

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.rapids.tool.profiling._

/**
* A profiling tool to parse Spark Event Log
* This is the Main function.
*/
object ProfileMain extends Logging {
/**
* Entry point from spark-submit running this as the driver.
*/
def main(args: Array[String]) {
val sparkSession = ProfileUtils.createSparkSession
val exitCode = mainInternal(sparkSession, new ProfileArgs(args))
if (exitCode != 0) {
System.exit(exitCode)
}
}

/**
* Entry point for tests
*/
def mainInternal(sparkSession: SparkSession, appArgs: ProfileArgs): Int = {

// This tool's output log file name
val logFileName = "rapids_4_spark_tools_output.log"

// Parsing args
val appArgs = new ProfileArgs(args)
val eventlogPaths = appArgs.eventlog()
val outputDirectory = appArgs.outputDirectory().stripSuffix("/")

// Create the FileWriter and sparkSession used for ALL Applications.
val fileWriter = new FileWriter(s"$outputDirectory/$logFileName")
val sparkSession = ProfileUtils.createSparkSession
logInfo(s"Output directory: $outputDirectory")

// Convert the input path string to Path(s)
Expand All @@ -68,9 +79,9 @@ object ProfileMain extends Logging {
//Exit if there are no applications to process.
if (apps.isEmpty) {
logInfo("No application to process. Exiting")
System.exit(0)
return 0
}
processApps(apps)
processApps(apps, generateDot = false)
// Show the application Id <-> appIndex mapping.
for (app <- apps) {
logApplicationInfo(app)
Expand All @@ -84,7 +95,7 @@ object ProfileMain extends Logging {
val app = new ApplicationInfo(appArgs, sparkSession, fileWriter, path, index)
apps += app
logApplicationInfo(app)
processApps(apps)
processApps(apps, appArgs.generateDot())
app.dropAllTempViews()
index += 1
}
Expand All @@ -100,7 +111,7 @@ object ProfileMain extends Logging {
* evaluated at once and the output is one row per application. Else each eventlog is parsed one
* at a time.
*/
def processApps(apps: ArrayBuffer[ApplicationInfo]): Unit = {
def processApps(apps: ArrayBuffer[ApplicationInfo], generateDot: Boolean): Unit = {
if (appArgs.compare()) { // Compare Applications
logInfo(s"### A. Compare Information Collected ###")
val compare = new CompareApplications(apps)
Expand All @@ -113,6 +124,9 @@ object ProfileMain extends Logging {
collect.printAppInfo()
collect.printExecutorInfo()
collect.printRapidsProperties()
if (generateDot) {
collect.generateDot()
}
}

logInfo(s"### B. Analysis ###")
Expand All @@ -133,5 +147,7 @@ object ProfileMain extends Logging {
logInfo(s"============== ${app.appId} (index=${app.index}) ==============")
logInfo("========================================================================")
}

0
}
}
Loading

0 comments on commit ed10c45

Please sign in to comment.