Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Report write data format and nested types #3263

Merged
merged 17 commits into from
Sep 3, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions docs/additional-functionality/qualification-profiling-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Here are 2 options:
```bash
git clone https://github.com/NVIDIA/spark-rapids.git
cd spark-rapids
mvn -pl .,tools clean verify -DskipTests
mvn -Pdefault -pl .,tools clean verify -DskipTests
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
```
The jar is generated in below directory :

Expand Down Expand Up @@ -144,14 +144,20 @@ you are running on needs to be taken into account.
had to estimate it, it means the event log was missing the application finished event so we will use the last job
or sql execution time we find as the end time used to calculate the duration.

`Complex Types and Unsupported Nested Complex Types` looks at the Read Schema and reports if there are any complex types(array, struct or maps)
in the schema. Nested complex types are complex types which contain other complex types (Example: array<struct<string,string>>).
Note that it can read all the schemas for DataSource V1. The Data Source V2 truncates the schema, so if you see ...,
then the full schema is not available. For such schemas we read until ... and report if there are any complex types and
nested complex types in that.

Note that SQL queries that contain failed jobs are not included.

Sample output in csv:

```
App Name,App ID,Score,Potential Problems,SQL DF Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,Read File Format Score,Unsupported Read File Formats and Types
job3,app-20210507174503-1704,4320658.0,"",9569,4320658,26171,35.34,false,0,"",20,100.0,""
job1,app-20210507174503-2538,19864.04,"",6760,21802,83728,71.3,false,0,"",20,55.56,"Parquet[decimal]"
App Name,App ID,Score,Potential Problems,SQL DF Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,Read File Format Score,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Unsupported Nested Complex Types
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
job3,app-20210507174503-1704,4320658.0,"",9569,4320658,26171,35.34,false,0,"",20,100.0,"",JSON,array<struct<city:string;state:string>>;map<string;string>,array<struct<city:string;state:string>>
job1,app-20210507174503-2538,19864.04,"",6760,21802,83728,71.3,false,0,"",20,55.56,"Parquet[decimal]",JSON;CSV,"",""
```

Sample output in text:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2019,10 +2019,12 @@ object SupportedOpsForTools {
val conf = new RapidsConf(Map.empty[String, String])
val types = TypeEnum.values.toSeq
val header = Seq("Format", "Direction") ++ types
val writeOps: Array[String] = Array.fill(types.size)("NA")
println(header.mkString(","))
GpuOverrides.fileFormats.toSeq.sortBy(_._1.toString).foreach {
case (format, ioMap) =>
val formatEnabled = format.toString.toLowerCase match {
val formatLowerCase = format.toString.toLowerCase
val formatEnabled = formatLowerCase match {
case "csv" => conf.isCsvEnabled && conf.isCsvReadEnabled
case "parquet" => conf.isParquetEnabled && conf.isParquetReadEnabled
case "orc" => conf.isOrcEnabled && conf.isOrcReadEnabled
Expand Down Expand Up @@ -2059,8 +2061,13 @@ object SupportedOpsForTools {
read.support(t).text
}
}
// only support reads for now
// print read formats and types
println(s"${(Seq(format, "read") ++ readOps).mkString(",")}")

// print write formats and NA for types. Cannot determine types from event logs.
if (!formatLowerCase.equals("csv")) {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
println(s"${(Seq(format, "write") ++ writeOps).mkString(",")}")
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions tools/src/main/resources/supportedDataSource.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,DECIMAL,NULL,BINARY,CALENDAR,ARRAY,MAP,STRUCT,UDT
CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,CO,NA,NS,NA,NA,NA,NA,NA
ORC,read,S,S,S,S,S,S,S,S,PS,S,CO,NA,NS,NA,PS,NS,PS,NS
ORC,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Parquet,read,S,S,S,S,S,S,S,S,PS,S,CO,NA,NS,NA,PS,PS,PS,NS
Parquet,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.qualification

import scala.collection.mutable.HashMap
import scala.collection.mutable.{ArrayBuffer,HashMap}
import scala.io.{BufferedSource, Source}

/**
Expand All @@ -38,28 +38,35 @@ class PluginTypeChecker {
private val DEFAULT_DS_FILE = "supportedDataSource.csv"

// map of file format => Map[support category => Seq[Datatypes for that category]]
// contains the details of formats to which ones have datatypes not supported
// var for testing puposes
private var formatsToSupportedCategory = readSupportedTypesForPlugin
// contains the details of formats to which ones have datatypes not supported.
// Write formats contains only the formats that are supported. Types cannot be determined
// from event logs for write formats.
// var for testing purposes
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
private var (readFormatsAndTypes, writeFormats) = readSupportedTypesForPlugin

// for testing purposes only
def setPluginDataSourceFile(filePath: String): Unit = {
val source = Source.fromFile(filePath)
formatsToSupportedCategory = readSupportedTypesForPlugin(source)
val (readFormatsAndTypesTest, writeFormatsTest) = readSupportedTypesForPlugin(source)
readFormatsAndTypes = readFormatsAndTypesTest
writeFormats = writeFormatsTest
}

private def readSupportedTypesForPlugin: Map[String, Map[String, Seq[String]]] = {
private def readSupportedTypesForPlugin: (
Map[String, Map[String, Seq[String]]], ArrayBuffer[String]) = {
val source = Source.fromResource(DEFAULT_DS_FILE)
readSupportedTypesForPlugin(source)
}

// file format should be like this:
// Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,...
// CSV,read,S,S,S,S,S,S,S,S,S*,S,NS,NA,NS,NA,NA,NA,NA,NA
// Parquet,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
private def readSupportedTypesForPlugin(
source: BufferedSource): Map[String, Map[String, Seq[String]]] = {
source: BufferedSource): (Map[String, Map[String, Seq[String]]], ArrayBuffer[String]) = {
// get the types the Rapids Plugin supports
val allSupportedReadSources = HashMap.empty[String, Map[String, Seq[String]]]
val allSupportedWriteFormats = ArrayBuffer[String]()
try {
val fileContents = source.getLines().toSeq
if (fileContents.size < 2) {
Expand All @@ -85,12 +92,14 @@ class PluginTypeChecker {
val allNsTypes = nsTypes.flatMap(t => getOtherTypes(t) :+ t)
val allBySup = HashMap(NS -> allNsTypes)
allSupportedReadSources.put(format, allBySup.toMap)
} else if (direction.equals("write")) {
allSupportedWriteFormats += format
}
}
} finally {
source.close()
}
allSupportedReadSources.toMap
(allSupportedReadSources.toMap, allSupportedWriteFormats)
}

def getOtherTypes(typeRead: String): Seq[String] = {
Expand All @@ -114,7 +123,7 @@ class PluginTypeChecker {
def scoreReadDataTypes(format: String, schema: String): (Double, Set[String]) = {
val schemaLower = schema.toLowerCase
val formatInLower = format.toLowerCase
val typesBySup = formatsToSupportedCategory.get(formatInLower)
val typesBySup = readFormatsAndTypes.get(formatInLower)
val score = typesBySup match {
case Some(dtSupMap) =>
// check if any of the not supported types are in the schema
Expand All @@ -140,4 +149,9 @@ class PluginTypeChecker {
}
score
}

def isWriteFormatsupported(writeFormat: ArrayBuffer[String]): ArrayBuffer[String] = {
writeFormat.map(x => x.toLowerCase.trim).filterNot(
writeFormats.map(x => x.trim).contains(_))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, printStdout
val initHeader = s"App Name,$appIdStr,Score,Potential Problems,$sqlDurStr,$taskDurStr," +
s"$appDurStr,Executor CPU Time Percent,App Duration Estimated," +
"SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent," +
"Read File Format Score,Unsupported Read File Formats and Types"
"Read File Format Score,Unsupported Read File Formats and Types," +
"Unsupported Write Data Format,Unsupported Nested Types"
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
if (reportReadSchema) {
initHeader + ",Read Schema"
} else {
Expand Down Expand Up @@ -73,16 +74,19 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, printStdout
val failedIds = stringIfempty(appSum.failedSQLIds)
// since csv, replace any commas with ; in the schema
val readFileFormats = stringIfempty(appSum.readFileFormats.replace(",", ";"))
val complexTypes = stringIfempty(appSum.complexTypes.replace(",", ";"))
val nestedComplexTypes = stringIfempty(appSum.nestedComplexTypes.replace(",", ";"))
val readFileScoreRounded = f"${appSum.readFileFormatScore}%1.2f"
val readFormatNS = stringIfempty(appSum.readFileFormatAndTypesNotSupported)
val writeDataFormat = stringIfempty(appSum.writeDataFormat)


val initRow = s"$appNameStr,$appIdStr,${appSum.score},$probStr," +
s"${appSum.sqlDataFrameDuration},${appSum.sqlDataframeTaskDuration}," +
s"${appSum.appDuration},${appSum.executorCpuTimePercent}," +
s"${appSum.endDurationEstimated},${appSum.sqlDurationForProblematic},$failedIds," +
s"${appSum.readScorePercent},${appSum.readFileFormatScore}," +
s"${readFormatNS}"
s"${readFormatNS},${writeDataFormat},${complexTypes},${nestedComplexTypes}"
if (reportReadSchema) {
initRow + s",$readFileFormats"
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class QualAppInfo(
var isPluginEnabled = false
var lastJobEndTime: Option[Long] = None
var lastSQLEndTime: Option[Long] = None
val writeDataFormat: ArrayBuffer[String] = ArrayBuffer[String]()

var appInfo: Option[QualApplicationInfo] = None
val sqlStart: HashMap[Long, QualSQLExecutionInfo] = HashMap[Long, QualSQLExecutionInfo]()
Expand Down Expand Up @@ -184,6 +185,91 @@ class QualAppInfo(
}.getOrElse(1.0)
}

def reportComplexTypes: (String, String) = {
if (dataSourceInfo.size != 0) {
val schema = dataSourceInfo.map { ds => ds.schema }
parseReadSchemaForNestedTypes(schema)
} else {
("", "")
}
}

private def parseReadSchemaForNestedTypes(schema: ArrayBuffer[String]): (String, String) = {
val tempStringBuilder = new StringBuilder()
val individualSchema: ArrayBuffer[String] = new ArrayBuffer()
var angleBracketsCount = 0
var parenthesesCount = 0
val distinctSchema = schema.distinct.mkString(",")

// Get the nested types i.e everything between < >
for (char <- distinctSchema) {
char match {
case '<' => angleBracketsCount += 1
case '>' => angleBracketsCount -= 1
// If the schema has decimals, Example decimal(6,2) then we have to make sure it has both
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
// opening and closing parentheses(unless the string is incomplete due to V2 reader).
case '(' => parenthesesCount += 1
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
case ')' => parenthesesCount -= 1
case _ =>
}
if (angleBracketsCount == 0 && parenthesesCount == 0 && char.equals(',')) {
individualSchema += tempStringBuilder.toString
tempStringBuilder.setLength(0)
}
else {
tempStringBuilder.append(char);
}
}
if (!tempStringBuilder.isEmpty) {
individualSchema += tempStringBuilder.toString
}

// If DataSource V2 is used, then Schema may be incomplete with ... appended at the end.
// We determine complex types and nested complex types until ...
val incompleteSchema = individualSchema.filter(x => x.contains("..."))
val completeSchema = individualSchema.filterNot(x => x.contains("..."))

// Check if it has types
val incompleteTypes = incompleteSchema.map { x =>
if (x.contains("...") && x.contains(":")) {
x.split(":", 2)(1).split("\\.\\.\\.")(0)
} else {
""
}
}
// Omit columnName and get only schemas
val completeTypes = completeSchema.map(x => x.split(":", 2)(1))
val schemaTypes = completeTypes ++ incompleteTypes

// Filter only complex types.
// Example: array<string>, array<struct<string, string>>
val complexTypes = schemaTypes.filter(x =>
x.startsWith("array<") || x.startsWith("map<") || x.startsWith("struct<"))

// Determine nested complex types from complex types
// Example: array<struct<string, string>> is nested complex type.
val nestedComplexTypes = complexTypes.filter(complexType => {
val startIndex = complexType.indexOf('<')
val closedBracket = complexType.lastIndexOf('>')
// If String is incomplete due to dsv2, then '>' may not be present. In that case traverse
// until length of the incomplete string
val lastIndex = if (closedBracket == -1) {
complexType.length - 1
} else {
closedBracket
}
val string = complexType.substring(startIndex, lastIndex + 1)
string.contains("array<") || string.contains("struct<") || string.contains("map<")
})

// Since it is saved as csv, replace commas with ;
val complexTypesResult = complexTypes.filter(_.nonEmpty).mkString(";").replace(",", ";")
val nestedComplexTypesResult = nestedComplexTypes.filter(
_.nonEmpty).mkString(";").replace(",", ";")

(complexTypesResult, nestedComplexTypesResult)
}

def aggregateStats(): Option[QualificationSummaryInfo] = {
appInfo.map { info =>
val appDuration = calculateAppDuration(info.startTime).getOrElse(0L)
Expand All @@ -205,12 +291,14 @@ class QualAppInfo(
val typeString = types.mkString(":").replace(",", ":")
s"${format}[$typeString]"
}.mkString(";")
val writeFormat = writeFormatNotSupported(writeDataFormat)
val (allComplexTypes, nestedComplexTypes) = reportComplexTypes

new QualificationSummaryInfo(info.appName, appId, scoreRounded, problems,
sqlDataframeDur, sqlDataframeTaskDuration, appDuration, executorCpuTimePercent,
endDurationEstimated, sqlDurProblem, failedIds, readScorePercent,
readScoreHumanPercentRounded, notSupportFormatAndTypesString,
getAllReadFileFormats)
getAllReadFileFormats, writeFormat, allComplexTypes, nestedComplexTypes)
}
}

Expand All @@ -228,8 +316,22 @@ class QualAppInfo(
val existingIssues = sqlIDtoProblematic.getOrElse(sqlID, Set.empty[String])
sqlIDtoProblematic(sqlID) = existingIssues ++ issues
}
// Get the write data format
if (node.name.contains("InsertIntoHadoopFsRelationCommand")) {
val writeFormat = node.desc.split(",")(2)
writeDataFormat += writeFormat
}
}
}

def writeFormatNotSupported(writeFormat: ArrayBuffer[String]): String = {
// Filter unsupported write data format
val unSupportedWriteFormat = pluginTypeChecker.map { checker =>
checker.isWriteFormatsupported(writeFormat)
}.getOrElse(ArrayBuffer[String]())

unSupportedWriteFormat.distinct.mkString(";").toUpperCase
}
}

class StageTaskQualificationSummary(
Expand Down Expand Up @@ -273,7 +375,10 @@ case class QualificationSummaryInfo(
readScorePercent: Int,
readFileFormatScore: Double,
readFileFormatAndTypesNotSupported: String,
readFileFormats: String)
readFileFormats: String,
writeDataFormat: String,
complexTypes: String,
nestedComplexTypes: String)

object QualAppInfo extends Logging {
def createApp(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Score,Potential Problems,SQL DF Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,Read File Format Score,Unsupported Read File Formats and Types
Spark shell,local-1626104300434,1322.1,DECIMAL,2429,1469,131104,88.35,false,160,"",20,50.0,Parquet[decimal];ORC[map:decimal]
App Name,App ID,Score,Potential Problems,SQL DF Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,Read File Format Score,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Unsupported Nested Complex Types
Spark shell,local-1626104300434,1322.1,DECIMAL,2429,1469,131104,88.35,false,160,"",20,50.0,Parquet[decimal];ORC[map:decimal],"",struct<firstname:string;middlename:array<string>;lastname:string>;struct<current:struct<state:string;city:string>;previous:struct<state:map<string;string>;city:string>>;array<struct<city:string;state:string>>;map<string;string>;map<string;array<string>>;map<string;map<string;string>>;array<array<string>>;array<string>,struct<firstname:string;middlename:array<string>;lastname:string>;struct<current:struct<state:string;city:string>;previous:struct<state:map<string;string>;city:string>>;array<struct<city:string;state:string>>;map<string;array<string>>;map<string;map<string;string>>;array<array<string>>
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Score,Potential Problems,SQL Dataframe Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,ReadFileFormat Score,Unsupported Read File Formats and Types
Spark shell,local-1623876083964,1417661.00,"",119903,1417661,133857,91.14,false,0,"",20,100.00,""
App Name,App ID,Score,Potential Problems,SQL Dataframe Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,ReadFileFormat Score,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Unsupported Nested Complex Types
Spark shell,local-1623876083964,1417661.00,"",119903,1417661,133857,91.14,false,0,"",20,100.00,"","","",""
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Score,Potential Problems,SQL DF Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,Read File Format Score,Unsupported Read File Formats and Types
Spark shell,local-1626189209260,1052.3,DECIMAL,1314,1238,106033,57.21,false,1023,"",20,25.0,Parquet[decimal]
App Name,App ID,Score,Potential Problems,SQL DF Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,Read File Format Score,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Unsupported Nested Complex Types
Spark shell,local-1626189209260,1052.3,DECIMAL,1314,1238,106033,57.21,false,1023,"",20,25.0,Parquet[decimal],"",array<struct<city:string;state:string>>;map<string;string>;map<string;array<string>>;map<string;map<string;string>>,array<struct<city:string;state:string>>;map<string;array<string>>;map<string;map<string;string>>
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Score,Potential Problems,SQL Dataframe Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,ReadFileFormat Score,Unsupported Read File Formats and Types
Spark shell,local-1623876083964,1417661.00,"",119903,1417661,133857,91.14,false,0,"",20,100.00,""
App Name,App ID,Score,Potential Problems,SQL Dataframe Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,ReadFileFormat Score,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Unsupported Nested Complex Types
Spark shell,local-1623876083964,1417661.00,"",119903,1417661,133857,91.14,false,0,"",20,100.00,"","","",""
Loading