Skip to content

Commit

Permalink
Benchmark utility to perform diff of output from benchmark runs, allo…
Browse files Browse the repository at this point in the history
…wing for precision differences (#782)

* Benchmark automation POC

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Default to collecting all results

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* address feedback

* make results action configurable, allowing for results to be written to parquet/csv

* fix typo in javadoc

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* remove unused imports

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Remove cold/hot run loops since they were causing confusion

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* gc between runs

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Make gc optional

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* update test

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Provide specific benchmark methods for collect vs write to CSV or Parquet

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* provide convenience methods to run benchmarks and store action in json

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Add utility method to perform a diff of the data collected from two DataFrames

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Add missing license header and remove unused import

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Provide option to ignore ordering of results

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* revert change to compare method

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* remove rmm_log.txt

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* optimize sorting

* let spark sort the data before collecting it

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* fix message

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Improved documentation

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* bug fix and optimization to non-iterator case

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* fix typo in javadoc

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* fail fast if row counts do not match

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* remove redundant logic

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* scalastyle

Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored Sep 21, 2020
1 parent 007f884 commit fdf731c
Showing 1 changed file with 176 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ import java.io.{File, FileOutputStream}
import java.time.Instant
import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.convert.ImplicitConversions.`iterator asScala`
import scala.collection.mutable.ListBuffer

import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.json4s.jackson.Serialization.writePretty

import org.apache.spark.{SPARK_BUILD_USER, SPARK_VERSION}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.col

object BenchUtils {

Expand Down Expand Up @@ -96,15 +98,15 @@ object BenchUtils {
* query and results to file, including all Spark configuration options and environment
* variables.
*
* @param spark The Spark session
* @param spark The Spark session
* @param createDataFrame Function to create a DataFrame from the Spark session.
* @param resultsAction Optional action to perform after creating the DataFrame, with default
* behavior of calling df.collect() but user could provide function to
* save results to CSV or Parquet instead.
* @param filenameStub The prefix for the output file. The current timestamp will be appended
* to ensure that filenames are unique and that results are not inadvertently
* overwritten.
* @param iterations The number of times to run the query.
* @param resultsAction Optional action to perform after creating the DataFrame, with default
* behavior of calling df.collect() but user could provide function to
* save results to CSV or Parquet instead.
* @param filenameStub The prefix for the output file. The current timestamp will be appended
* to ensure that filenames are unique and that results are not
* inadvertently overwritten.
* @param iterations The number of times to run the query.
*/
def runBench(
spark: SparkSession,
Expand All @@ -114,9 +116,9 @@ object BenchUtils {
filenameStub: String,
iterations: Int,
gcBetweenRuns: Boolean
): Unit = {
): Unit = {

assert(iterations>0)
assert(iterations > 0)

val queryStartTime = Instant.now()

Expand Down Expand Up @@ -160,7 +162,7 @@ object BenchUtils {
println(s"Best of $numHotRuns hot run(s): ${hotRuns.min} msec.")
println(s"Worst of $numHotRuns hot run(s): ${hotRuns.max} msec.")
println(s"Average of $numHotRuns hot run(s): " +
s"${hotRuns.sum.toDouble/numHotRuns} msec.")
s"${hotRuns.sum.toDouble / numHotRuns} msec.")
}

// write results to file
Expand Down Expand Up @@ -246,6 +248,168 @@ object BenchUtils {
}
}

/**
* Perform a diff of the results collected from two DataFrames, allowing for differences in
* precision.
*
* The intended usage is to run timed benchmarks that write results to file and then separately
* use this utility to compare those result sets. This code performs a sort and a collect and
* is only suitable for data sets that can fit in the driver's memory. For larger datasets,
* a better approach would be to convert the results to single files, download them locally
* and adapt this Scala code to read those files directly (without using Spark).
*
* Example usage:
*
* <pre>
* scala> val cpu = spark.read.parquet("/data/q5-cpu")
* scala> val gpu = spark.read.parquet("/data/q5-gpu")
* scala> import com.nvidia.spark.rapids.tests.common._
* scala> BenchUtils.compareResults(cpu, gpu, ignoreOrdering=true, epsilon=0.0)
* Collecting rows from DataFrame
* Collected 989754 rows in 7.701 seconds
* Collecting rows from DataFrame
* Collected 989754 rows in 2.325 seconds
* Results match
* </pre>
*
* @param df1 DataFrame to compare.
* @param df2 DataFrame to compare.
* @param ignoreOrdering Sort the data collected from the DataFrames before comparing them.
* @param useIterator When set to true, use `toLocalIterator` to load one partition at a time
* into driver memory, reducing memory usage at the cost of performance
* because processing will be single-threaded.
* @param maxErrors Maximum number of differences to report.
* @param epsilon Allow for differences in precision when comparing floating point values.
*/
def compareResults(
df1: DataFrame,
df2: DataFrame,
ignoreOrdering: Boolean,
useIterator: Boolean = false,
maxErrors: Int = 10,
epsilon: Double = 0.00001): Unit = {

val count1 = df1.count()
val count2 = df2.count()

if (count1 == count2) {
println(s"Both DataFrames contain $count1 rows")
val result1 = collectResults(df1, ignoreOrdering, useIterator)
val result2 = collectResults(df2, ignoreOrdering, useIterator)

var errors = 0
var i = 0
while (i < count1 && errors < maxErrors) {
val l = result1.next()
val r = result2.next()
if (!rowEqual(l, r, epsilon)) {
println(s"Row $i:\n${l.mkString(",")}\n${r.mkString(",")}\n")
errors += 1
}
i += 1
}
println(s"Processed $i rows")

if (errors == maxErrors) {
println(s"Aborting comparison after reaching maximum of $maxErrors errors")
} else if (errors == 0) {
println(s"Results match")
} else {
println(s"There were $errors errors")
}
} else {
println(s"DataFrame row counts do not match: $count1 != $count2")
}
}

private def collectResults(
df: DataFrame,
ignoreOrdering: Boolean,
useIterator: Boolean): Iterator[Seq[Any]] = {

// apply sorting if specified
val resultDf = if (ignoreOrdering) {
// let Spark do the sorting
df.sort(df.columns.map(col): _*)
} else {
df
}

val it: Iterator[Row] = if (useIterator) {
resultDf.toLocalIterator()
} else {
println("Collecting rows from DataFrame")
val t1 = System.currentTimeMillis()
val rows = resultDf.collect()
val t2 = System.currentTimeMillis()
println(s"Collected ${rows.length} rows in ${(t2-t1)/1000.0} seconds")
rows.toIterator
}

// map Iterator[Row] to Iterator[Seq[Any]]
it.map(_.toSeq)
}

private def rowEqual(row1: Seq[Any], row2: Seq[Any], epsilon: Double): Boolean = {
row1.zip(row2).forall {
case (l, r) => compare(l, r, epsilon)
}
}

// this is copied from SparkQueryCompareTestSuite
private def compare(expected: Any, actual: Any, epsilon: Double = 0.0): Boolean = {
def doublesAreEqualWithinPercentage(expected: Double, actual: Double): (String, Boolean) = {
if (!compare(expected, actual)) {
if (expected != 0) {
val v = Math.abs((expected - actual) / expected)
(s"\n\nABS($expected - $actual) / ABS($actual) == $v is not <= $epsilon ", v <= epsilon)
} else {
val v = Math.abs(expected - actual)
(s"\n\nABS($expected - $actual) == $v is not <= $epsilon ", v <= epsilon)
}
} else {
("SUCCESS", true)
}
}
(expected, actual) match {
case (a: Float, b: Float) if a.isNaN && b.isNaN => true
case (a: Double, b: Double) if a.isNaN && b.isNaN => true
case (null, null) => true
case (null, _) => false
case (_, null) => false
case (a: Array[_], b: Array[_]) =>
a.length == b.length && a.zip(b).forall { case (l, r) => compare(l, r, epsilon) }
case (a: Map[_, _], b: Map[_, _]) =>
a.size == b.size && a.keys.forall { aKey =>
b.keys.find(bKey => compare(aKey, bKey))
.exists(bKey => compare(a(aKey), b(bKey)))
}
case (a: Iterable[_], b: Iterable[_]) =>
a.size == b.size && a.zip(b).forall { case (l, r) => compare(l, r, epsilon) }
case (a: Product, b: Product) =>
compare(a.productIterator.toSeq, b.productIterator.toSeq, epsilon)
case (a: Row, b: Row) =>
compare(a.toSeq, b.toSeq, epsilon)
// 0.0 == -0.0, turn float/double to bits before comparison, to distinguish 0.0 and -0.0.
case (a: Double, b: Double) if epsilon <= 0 =>
java.lang.Double.doubleToRawLongBits(a) == java.lang.Double.doubleToRawLongBits(b)
case (a: Double, b: Double) if epsilon > 0 =>
val ret = doublesAreEqualWithinPercentage(a, b)
if (!ret._2) {
System.err.println(ret._1 + " (double)")
}
ret._2
case (a: Float, b: Float) if epsilon <= 0 =>
java.lang.Float.floatToRawIntBits(a) == java.lang.Float.floatToRawIntBits(b)
case (a: Float, b: Float) if epsilon > 0 =>
val ret = doublesAreEqualWithinPercentage(a, b)
if (!ret._2) {
System.err.println(ret._1 + " (float)")
}
ret._2
case (a, b) => a == b
}
}
}

/** Top level benchmark report class */
Expand Down

0 comments on commit fdf731c

Please sign in to comment.