Skip to content

Commit

Permalink
Qualification tool updates for datasets, udf, and misc fixes (NVIDIA#…
Browse files Browse the repository at this point in the history
…2505)

* Start looking for datasets, udfs, etc

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* more qualification

* Add utility to generate eventlog using Dataset api

* Add function to generate event log with UDF

* Add test utilities and files for udf and dataset

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* cleanup

* cleanup

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Add some testing utils dealing with directories

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* remove log

* spacing updatees:

* fix spacing

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Update rapids-4-spark-tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Qualification.scala

Co-authored-by: Andy Grove <andygrove73@gmail.com>

* Update rapids-4-spark-tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/QualificationInfoUtils.scala

Co-authored-by: Niranjan Artal <50492963+nartal1@users.noreply.github.com>

* Update rapids-4-spark-tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/QualificationInfoUtils.scala

Co-authored-by: Niranjan Artal <50492963+nartal1@users.noreply.github.com>

* fix extra else

Co-authored-by: Andy Grove <andygrove73@gmail.com>
Co-authored-by: Niranjan Artal <50492963+nartal1@users.noreply.github.com>
  • Loading branch information
3 people authored May 26, 2021
1 parent 79ee29a commit 6d14296
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 25 deletions.
20 changes: 10 additions & 10 deletions rapids-4-spark-tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@ Works with both cpu and gpu generated event logs.

(The code is based on Apache Spark 3.1.1 source code, and tested using Spark 3.0.x and 3.1.1 event logs)

## Prerequisites
1. Request Spark 3.1.1 or newer installed

## How to compile and use with Spark
1. `mvn clean package`
2. Copy rapids-4-spark-tools-<version>.jar to $SPARK_HOME/jars/

`cp target/rapids-4-spark-tools-21.06.0-SNAPSHOT.jar $SPARK_HOME/jars/`

3. In spark-shell:
2. Include rapids-4-spark-tools-<version>.jar in the '--jars' option to spark-shell or spark-submit
3. After starting spark-shell:
For a single event log analysis:
```
org.apache.spark.sql.rapids.tool.profiling.ProfileMain.main(Array("/path/to/eventlog1"))
com.nvidia.spark.rapids.tool.profiling.ProfileMain.main(Array("/path/to/eventlog1"))
```

For multiple event logs comparison and analysis:
```
org.apache.spark.sql.rapids.tool.profiling.ProfileMain.main(Array("/path/to/eventlog1", "/path/to/eventlog2"))
com.nvidia.spark.rapids.tool.profiling.ProfileMain.main(Array("/path/to/eventlog1", "/path/to/eventlog2"))
```

## How to compile and use from command-line
1. `mvn clean package`
2. `cd $SPARK_HOME (Download Apache Spark if required)`
3. `./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar /path/to/eventlog1`
3. `./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar /path/to/eventlog1`

## Options
```
$ ./bin/spark-submit --class org.apache.spark.sql.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar --help
$ ./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools-<version>.jar --help
Spark event log profiling tool
Expand Down Expand Up @@ -112,4 +112,4 @@ We can input multiple Spark event logs and this tool can compare enviroments, ex
|spark.rapids.sql.incompatibleOps.enabled |null |true |
|spark.rapids.sql.variableFloatAgg.enabled |null |TRUE |
+-------------------------------------------+----------+----------+
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ case class TaskCase(
output_bytesWritten: Long,
output_recordsWritten: Long)

case class ProblematicSQLCase(sqlID: Long)
case class ProblematicSQLCase(sqlID: Long, reason: String, desc: String)
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,4 @@ object ProfileMain extends Logging {

0
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,28 @@ class Qualification(
if (!app.allDataFrames.contains(s"sqlDF_${app.index}")) {
logInfo(s"${app.appId} (index=${app.index}) is disqualified because no SQL is inside.")
fileWriter.write(s"${app.appId} (index=${app.index}) is " +
s"disqualified because no SQL is inside.\n")
s"disqualified because no SQL is inside.\n")
return false
}

val dfProb = app.queryToDF(app.qualificationSQLDataSet)
if (!dfProb.isEmpty) {
logInfo(s"${app.appId} (index=${app.index}) is disqualified because it is problematic " +
"(UDF, Dataset, etc).")
fileWriter.write(s"${app.appId} (index=${app.index}) is " +
s"disqualified because problematic (UDF, Dataset, etc.)\n")
fileWriter.write("Reason disqualified:\n")
fileWriter.write(ToolUtils.showString(dfProb, app.args.numOutputRows.getOrElse(1000)))
}
val df = app.queryToDF(app.qualificationSQL)
if (df.isEmpty) {
logInfo(s"${app.appId} (index=${app.index}) is disqualified because no SQL is qualified.")
fileWriter.write(s"${app.appId} (index=${app.index}) is " +
s"disqualified because no SQL is qualified.\n")
s"disqualified because no SQL is qualified\n")
false
} else {
fileWriter.write(s"${app.appId} (index=${app.index}) " +
s"is qualified with below qualified SQL(s):\n")
s"is qualified with below qualified SQL(s):\n")
fileWriter.write("\n" + ToolUtils.showString(df, app.args.numOutputRows.getOrElse(1000)))
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ class ApplicationInfo(
val allnodes = planGraph.allNodes
for (node <- allnodes){
// Firstly identify problematic SQLs if there is any
if (isProblematicPlan(node)){
problematicSQL += ProblematicSQLCase(sqlID)
val probReason = isProblematicPlan(node)
if (probReason.nonEmpty) {
problematicSQL += ProblematicSQLCase(sqlID, probReason, node.desc)
}
// Then process SQL plan metric type
for (metric <- node.metrics){
Expand Down Expand Up @@ -664,9 +665,33 @@ class ApplicationInfo(
|""".stripMargin
}

// Function to generate a query for qualification
def qualificationSQLDataSet: String = {
s"""select distinct(sqlID), reason, desc from problematicSQLDF_$index
|""".stripMargin
}

// Function to determine if a SparkPlanGraphNode could be problematic.
def isProblematicPlan(node: SparkPlanGraphNode): Boolean = {
node.name == "GpuColumnarToRow" || node.name == "GpuRowToColumnar" ||
(node.desc matches ".*\\$Lambda\\$.*")
// // todo what about scan for genreated data? -> SerializeFromObject?
// LocalTableScan?
// these may only apply to profiling part so take out for now
// if (node.name == "GpuColumnarToRow") {
// "GpuColumnarToRow"
// } else if (node.name == "GpuRowToColumnar") {
// "GpuRowToColumnar"
def isProblematicPlan(node: SparkPlanGraphNode): String = {
logWarning("node description is: " + node.desc)
isDescProblematic(node.desc)
}

// TODO - do we want to handle existing RDD
// case e if e.matches(".*ExistingRDD.*") => "existingRDD"
private def isDescProblematic(desc: String): String = {
desc match {
case l if l.matches(".*\\$Lambda\\$.*") => "Dataset/Lambda"
case u if u.matches(".*UDF.*") => "UDF"
case a if a.endsWith(".apply") => "Dataset/Apply"
case _ => ""
}
}
}
}
38 changes: 38 additions & 0 deletions rapids-4-spark-tools/src/test/resources/dataset_eventlog

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions rapids-4-spark-tools/src/test/resources/udf_dataset_eventlog

Large diffs are not rendered by default.

64 changes: 64 additions & 0 deletions rapids-4-spark-tools/src/test/resources/udf_func_eventlog

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.profiling

import java.io.File

import org.apache.spark.internal.Logging
import org.apache.spark.rapids.TestUtils
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.functions.{col, udf}

// class used for testing
case class RapidsFriends(name: String, friend: String, age: Int)

/**
* Utilities to generate event logs used for qualification testing.
*/
object QualificationInfoUtils extends Logging {

def randomStringFromCharList(length: Int, chars: Seq[Char]): String = {
val sb = new StringBuilder
for (i <- 1 to length) {
val randomNum = util.Random.nextInt(chars.length)
sb.append(chars(randomNum))
}
sb.toString
}

def randomAlpha(length: Int): String = {
val chars = ('a' to 'z') ++ ('A' to 'Z')
randomStringFromCharList(length, chars)
}

def randomString(length: Int) = {
val r = new scala.util.Random
val sb = new StringBuilder
for (i <- 1 to length) {
sb.append(r.nextPrintableChar)
}
sb.toString
}

val randForInt = new scala.util.Random(11)
def randomInt(): Int = {
randForInt.nextInt(100)
}

def generateFriendsDataset(spark: SparkSession): Dataset[RapidsFriends] = {
import spark.implicits._
val df = spark.sparkContext.parallelize(
Seq.fill(1000){(randomAlpha(10), randomAlpha(5), randomInt)})
.toDF("name", "friend", "age")
df.as[RapidsFriends]
}

// dataset operations in plan show up as Lambda
def genDatasetEventLog(spark: SparkSession, size: Int = 1000) = {
import spark.implicits._
TestUtils.withTempPath { jsonOutFile =>
val ds = generateFriendsDataset(spark)
val dsAge = ds.filter(d => d.age > 25).map(d => (d.friend, d.age))
dsAge.write.json(jsonOutFile.getCanonicalPath)
}
}

def parseAge = (age: Int) => {
val majorAge = Seq("21", "25", "55", "18")
if (majorAge.contains(age)) {
"MILESTONE"
} else {
"other"
}
}

// UDF with dataset, shows up with Lambda
def genUDFDSEventLog(spark: SparkSession, size: Int = 1000) = {
import spark.implicits._
TestUtils.withTempPath { jsonOutFile =>
val ageFunc = udf(parseAge)
val ds = generateFriendsDataset(spark)
ds.withColumn("ageCategory",ageFunc(col("age")))
val dsAge = ds.filter(d => d.age > 25).map(d => (d.friend, d.age))
dsAge.write.json(jsonOutFile.getCanonicalPath)
}
}

def cleanCountry = (country: String) => {
val allUSA = Seq("US", "USa", "USA", "United states", "United states of America")
if (allUSA.contains(country)) {
"USA"
} else {
"unknown"
}
}

// if register UDF with udf function it shows up in plan with UDF
// Registering udf like:
// val normaliseCountry = spark.udf.register("normalisedCountry",cleanCountry)
// doesn't seem to put anything unique in the plan
def genUDFFuncEventLog(spark: SparkSession, size: Int = 1000) = {
import spark.implicits._
TestUtils.withTempPath { jsonInputFile =>
TestUtils.withTempPath { jsonOutFile =>
val userData = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US"),
(2, "Monica", "New york", "USa"),
(3, "Phoebe", "Suny", "USA"),
(4, "Rachael", "St louis", "United states of America"),
(5, "Joey", "LA", "Ussaa"),
(6, "Ross", "Detroit", "United states")
)).toDF("id", "name", "city", "country")
userData.write.json(jsonInputFile.getCanonicalPath)
val userDataRead = spark.read.json(jsonInputFile.getCanonicalPath)
val allUSA = Seq("US", "USa", "USA", "United states", "United states of America")
userDataRead.createOrReplaceTempView("user_data")
val cleanCountryUdf = udf(cleanCountry)
val resDf = userDataRead.withColumn("normalisedCountry", cleanCountryUdf(col("country")))
resDf.write.json(jsonOutFile.getCanonicalPath)
}
}
}

/*
* Example command:
* $SPARK_HOME/bin/spark-submit --master local[1] --driver-memory 30g \
* --jars ./rapids-4-spark-tools/target/rapids-4-spark-tools-21.06.0-SNAPSHOT-tests.jar,\
* ./rapids-4-spark-tools/target/rapids-4-spark-tools-21.06.0-SNAPSHOT.jar \
* --class com.nvidia.spark.rapids.tool.profiling.QualificationInfoUtils \
* ./rapids-4-spark-tools/target/rapids-4-spark-tools-21.06.0-SNAPSHOT-tests.jar udffunc \
* /tmp/testeventlogDir 100001
*/
def main(args: Array[String]): Unit = {
if (args.length == 0) {
println(s"ERROR: must specify a logType dataset, udfds, or udffunc")
System.exit(1)
}
val logType = args(0)
if (logType != "dataset" && logType != "udfds" && logType != "udffunc") {
println(s"ERROR: logType must be one of: dataset, udfds, or udffunc")
System.exit(1)
}
val eventDir = if (args.length > 1) args(1) else "/tmp/spark-eventLogTest"
val size = if (args.length > 2) args(2).toInt else 1000
val spark = {
SparkSession
.builder()
.master("local[*]")
.appName("Rapids Spark Profiling Tool Unit Tests")
.config("spark.eventLog.enabled", "true")
.config("spark.eventLog.dir", eventDir)
.getOrCreate()
}
import spark.implicits._
if (logType.toLowerCase.equals("dataset")) {
genDatasetEventLog(spark, size)
} else if (logType.toLowerCase.equals("udfds")) {
genUDFDSEventLog(spark, size)
} else if (logType.toLowerCase.equals("udffunc")) {
genUDFFuncEventLog(spark, size)
} else {
println(s"ERROR: Invalid log type specified: $logType")
System.exit(1)
}
spark.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rapids

import java.io.File

import org.apache.spark.util.Utils

/**
* Trampoling to get to the Spark Utils class to use for testing.
*/
object TestUtils {

def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}
}
4 changes: 0 additions & 4 deletions workload_profiling/.gitignore

This file was deleted.

0 comments on commit 6d14296

Please sign in to comment.