Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Adding Benchmarking classes to evaluate core tools performance #1169

Merged
merged 29 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5fa3fff
Add Benchmarking to evaluate the core tools performance
amahussein Jul 1, 2024
cf099b6
Adding changes for argument parsing
bilalbari Jul 2, 2024
1f5155d
changing warmup time to warmup iterations
bilalbari Jul 2, 2024
ad4e562
Removing unsed imports
bilalbari Jul 2, 2024
303ee60
Adding Qualification Benchmark
bilalbari Jul 2, 2024
6137021
Changes for review comments
bilalbari Jul 2, 2024
4f85b8d
Changes for review comments
bilalbari Jul 2, 2024
0131c71
removing name param from scallop options
bilalbari Jul 2, 2024
8e3b18b
Merge pull request #11 from amahussein/spark-rapids-tools-1120-FEA_ARGS
bilalbari Jul 2, 2024
1026e69
Adding GC Metrics (#12)
bilalbari Jul 5, 2024
d9a8be1
Review changes
bilalbari Jul 9, 2024
9033c82
Correcting scalastyle failure
bilalbari Jul 9, 2024
9199881
Correcting passed argument name for semantic clarity
bilalbari Jul 10, 2024
3001cb3
Short flag + desc - update
bilalbari Jul 10, 2024
118a505
Updating short flag usage
bilalbari Jul 10, 2024
eaa6a2c
Refactor for correcting structure
bilalbari Jul 10, 2024
b6e7b3f
Review comments changes
bilalbari Jul 11, 2024
73d3992
Review comment changes
bilalbari Jul 11, 2024
3d9290f
Adding separator as a val
bilalbari Jul 11, 2024
d6bc2da
Adding README for benchmark
bilalbari Jul 11, 2024
3a8d4c5
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into spark…
bilalbari Jul 12, 2024
25a4d80
Review changes for README
bilalbari Jul 12, 2024
9b59546
Corrected wording in README
bilalbari Jul 12, 2024
2000e28
Updated README review changes
bilalbari Jul 12, 2024
f4d3293
Correcting README typo
bilalbari Jul 12, 2024
a7db0dd
Correcting README typo
bilalbari Jul 12, 2024
c424cc9
README typo - RAPIDS_TOOLs -> SPARK_RAPIDS..
bilalbari Jul 12, 2024
9fe291e
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into spark…
bilalbari Jul 15, 2024
17c2a05
Adding license header + README changes
bilalbari Jul 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@

package org.apache.spark.rapids.tool.benchmarks

import java.io.{OutputStream, PrintStream}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.NANOSECONDS

import org.apache.commons.io.output.TeeOutputStream

import org.apache.spark.sql.rapids.tool.util.{MemoryMetricsTracker, RuntimeUtil, ToolsTimer}
import org.apache.spark.sql.rapids.tool.util.{MemoryMetricsTracker, ToolsTimer}

/**
* This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase
Expand All @@ -38,19 +34,12 @@ import org.apache.spark.sql.rapids.tool.util.{MemoryMetricsTracker, RuntimeUtil,
*/
class Benchmark(
name: String = "Benchmarker",
valuesPerIteration: Long,
minNumIters: Int,
warmUpIterations: Int,
outputPerIteration: Boolean = false,
output: Option[OutputStream] = None) {
outputPerIteration: Boolean = false) {
import Benchmark._

val benchmarks: mutable.ArrayBuffer[Case] = mutable.ArrayBuffer.empty[Benchmark.Case]
val out: PrintStream = if (output.isDefined) {
new PrintStream(new TeeOutputStream(System.out, output.get))
} else {
System.out
}

/**
* Adds a case to run when run() is called. The given function will be run for several
Expand Down Expand Up @@ -84,54 +73,25 @@ class Benchmark(
* a comment with the benchmark. Although the results vary from machine to machine, it should
* provide some baseline.
*/
def run(): Unit = {
def run(): mutable.ArrayBuffer[Result] = {
parthosa marked this conversation as resolved.
Show resolved Hide resolved
require(benchmarks.nonEmpty)
println("-" * 80)
parthosa marked this conversation as resolved.
Show resolved Hide resolved
println("Running benchmark: " + name)
println("-" * 80)
val results = benchmarks.map { c =>
println(" RUNNING CASE : " + c.name)
println("-" * 80)
measure(valuesPerIteration, c.numIters)(c.fn)
measure(c.name, c.numIters)(c.fn)
}
println

val firstBest = results.head.bestMs
val jvmInfo = RuntimeUtil.getJVMOSInfo
out.printf(s"%-26s : %s \n","JVM Name", jvmInfo("jvm.name"))
out.printf(s"%-26s : %s \n","Java Version", jvmInfo("jvm.version"))
out.printf(s"%-26s : %s \n","OS Name", jvmInfo("os.name"))
out.printf(s"%-26s : %s \n","OS Version", jvmInfo("os.version"))
out.printf(s"%-26s : %s MB \n","MaxHeapMemory",
(Runtime.getRuntime.maxMemory()/1024/1024).toString)
out.printf(s"%-26s : %s \n","Total Warm Up Iterations", warmUpIterations.toString)
out.printf(s"%-26s : %s \n \n","Total Runtime Iterations", minNumIters.toString)
val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max))
out.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)","Avg GC Time(ms)",
"Avg GC Count", "Stdev GC Count","Max GC Time(ms)","Max GC Count", "Relative")
out.println("-" * (nameLen + 160))
results.zip(benchmarks).foreach { case (result, benchmark) =>
out.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
benchmark.name,
"%5.0f" format result.bestMs,
"%4.0f" format result.avgMs,
"%5.0f" format result.stdevMs,
"%5.1f" format result.memoryParams.avgGCTime,
"%5.1f" format result.memoryParams.avgGCCount,
"%5.0f" format result.memoryParams.stdDevGCCount,
"%5d" format result.memoryParams.maxGcTime,
"%5d" format result.memoryParams.maxGCCount,
"%3.2fX" format (firstBest / result.bestMs))
}
out.println()
results
}

/**
* Runs a single function `f` for iters, returning the average time the function took and
* the rate of the function.
*/
def measure(num: Long, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = {
def measure(name: String, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = {
System.gc() // ensures garbage from previous cases don't impact this one
for (wi <- 0 until warmUpIterations) {
f(new ToolsTimer(-1))
Expand Down Expand Up @@ -177,7 +137,9 @@ class Benchmark(
val avgGcCount = gcCounts.sum / minIters
val avgGcTime = gcTimes.sum / minIters
val maxGcTime = gcTimes.max
Benchmark.Result(avgRuntime / 1000000.0, bestRuntime / 1000000.0, stdevRunTime / 1000000.0,
Benchmark.Result(name, avgRuntime / 1000000.0,
bestRuntime / 1000000.0,
stdevRunTime / 1000000.0,
JVMMemoryParams(avgGcTime, avgGcCount, stdevGcCount, maxGcCount, maxGcTime))
}
}
Expand All @@ -187,6 +149,6 @@ object Benchmark {
case class Case(name: String, fn: ToolsTimer => Unit, numIters: Int)
case class JVMMemoryParams( avgGCTime:Double, avgGCCount:Double,
stdDevGCCount: Double, maxGCCount: Long, maxGcTime:Long)
case class Result(avgMs: Double, bestMs: Double, stdevMs: Double,
case class Result(caseName: String, avgMs: Double, bestMs: Double, stdevMs: Double,
memoryParams: JVMMemoryParams)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ Benchmarker class for running various benchmarks.
default = Some("text"), descr = "Output format for the benchmark results. For text" +
" the result output will be tabular. In case of json , the results" +
"will be JSON formatted. Currently supported formats are text, json")
val extraArgs: ScallopOption[String] = opt[String](short = 'a',
// Conflict with `iterations` for using short flag - `i`.
// Going with - `a` for now
val inputArgs: ScallopOption[String] = opt[String](short = 'a',
required = false,
descr = "Extra arguments to pass to the benchmark.These arguments will be sent to the " +
"benchmark class. The format is space separated arguments. For example " +
descr = "Input arguments to pass to the benchmark suite. Used as common arguments across " +
"benchmarks. The format is space separated arguments. For example " +
"--output-directory /tmp --per-sql /tmp/eventlogs")
verify()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

package org.apache.spark.rapids.tool.benchmarks

import java.io.{File, FileOutputStream, OutputStream}
import java.io.{File, FileOutputStream, PrintStream}

import scala.collection.mutable

import org.apache.commons.io.output.TeeOutputStream

import org.apache.spark.sql.rapids.tool.util.RuntimeUtil

/**
* This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase
Expand All @@ -26,16 +32,14 @@ import java.io.{File, FileOutputStream, OutputStream}
* the generated results.
*/
abstract class BenchmarkBase {
var output: Option[OutputStream] = None
private var output: Option[PrintStream] = None
private var benchmark: Option[Benchmark] = None
/**
* Main process of the whole benchmark.
* Implementations of this method are supposed to use the wrapper method `runBenchmark`
* for each benchmark scenario.
*/
def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
outputFormat: String,
extraArgs: Array[String]): Unit
def runBenchmarkSuite(inputArgs: Array[String]): Unit

final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
val separator = "=" * 96
Expand All @@ -47,6 +51,69 @@ abstract class BenchmarkBase {
def prefix: String = "rapids-tools-benchmark"
def suffix: String = ""

/**
* Add a benchmark case to the suite
* @param name Name of the benchmark case
* @param numIters Number of iterations to run
* @param f Function to run
*/
def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
benchmark.get.addCase(name, numIters)(f)
}

/**
* Method to trigger the benchmarker cases run method
*/
def run(): Unit = {
// Getting correct output stream
val printStream = output.getOrElse(System.out)
// Running the underlying benchmark
val results: mutable.ArrayBuffer[Benchmark.Result] = benchmark.get.run()
parthosa marked this conversation as resolved.
Show resolved Hide resolved
// Generating the output report
val firstBest = results.head.bestMs
val nameLen = Math.max(40, results.map(_.caseName.length).max)
printStream.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
"Benchmark :", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)","Avg GC Time(ms)",
"Avg GC Count", "Stdev GC Count","Max GC Time(ms)","Max GC Count", "Relative")
printStream.println("-" * (nameLen + 160))
results.foreach { result =>
printStream.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
parthosa marked this conversation as resolved.
Show resolved Hide resolved
result.caseName,
"%5.0f" format result.bestMs,
"%4.0f" format result.avgMs,
"%5.0f" format result.stdevMs,
"%5.1f" format result.memoryParams.avgGCTime,
"%5.1f" format result.memoryParams.avgGCCount,
"%5.0f" format result.memoryParams.stdDevGCCount,
"%5d" format result.memoryParams.maxGcTime,
"%5d" format result.memoryParams.maxGCCount,
"%3.2fX" format (firstBest / result.bestMs))
}
printStream.println()
}

/**
* Method to print the system run specific information
* @param warmUpIterations Total warm up iterations
* @param iterations Total runtime iterations
* @param inputArgs Input arguments
*/
private def printSystemInformation(warmUpIterations: Int, iterations: Int,
inputArgs: String ): Unit = {
val jvmInfo = RuntimeUtil.getJVMOSInfo
output.get.printf(s"%-26s : %s \n","JVM Name", jvmInfo("jvm.name"))
output.get.printf(s"%-26s : %s \n","Java Version", jvmInfo("jvm.version"))
output.get.printf(s"%-26s : %s \n","OS Name", jvmInfo("os.name"))
output.get.printf(s"%-26s : %s \n","OS Version", jvmInfo("os.version"))
output.get.printf(s"%-26s : %s MB \n","MaxHeapMemory",
(Runtime.getRuntime.maxMemory()/1024/1024).toString)
output.get.printf(s"%-26s : %s \n","Total Warm Up Iterations",
warmUpIterations.toString)
output.get.printf(s"%-26s : %s \n","Total Runtime Iterations",
iterations.toString)
output.get.printf(s"%-26s : %s \n \n","Input Arguments", inputArgs)
}

/**
* Any shutdown code to ensure a clean shutdown
*/
Expand All @@ -65,16 +132,20 @@ abstract class BenchmarkBase {
if (!file.exists()) {
file.createNewFile()
}
output = Some(new FileOutputStream(file))
runBenchmarkSuite(benchArgs.iterations(),
benchArgs.warmupIterations(),
benchArgs.outputFormat(),
benchArgs.extraArgs().split("\\s+").filter(_.nonEmpty))
output.foreach { o =>
if (o != null) {
o.close()
}
}
// Creating a new output stream
// Using TeeOutputStream to multiplex output to both file and stdout
val outputStream = new FileOutputStream(file)
output = Some(new PrintStream(new TeeOutputStream(System.out, outputStream)))
benchmark = Some(new Benchmark(minNumIters = benchArgs.iterations(),
warmUpIterations = benchArgs.warmupIterations(),
outputPerIteration = true))
// Printing the system information
printSystemInformation(benchArgs.warmupIterations(),
benchArgs.iterations(), benchArgs.inputArgs())
// Passing the input arguments to the suite function
runBenchmarkSuite(benchArgs.inputArgs().split("\\s+").filter(_.nonEmpty))
// Closing the output stream
outputStream.close()
afterAll()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,22 @@ import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal
* 3. Write benchmarked code inside
parthosa marked this conversation as resolved.
Show resolved Hide resolved
*/
object SingleThreadedQualToolBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
outputFormat: String,
extraArgs: Array[String]): Unit = {
override def runBenchmarkSuite(inputArgs: Array[String]): Unit = {
// Currently the input arguments are assumed to be common across cases
// This will be improved in a follow up PR to enable passing as a config
// file with argument support for different cases
runBenchmark("Benchmark_Per_SQL_Arg_Qualification") {
val benchmarker =
new Benchmark(
valuesPerIteration = 2,
output = output,
outputPerIteration = true,
warmUpIterations = warmUpIterations,
minNumIters = iterations)
val (prefix, suffix) = extraArgs.splitAt(extraArgs.length - 1)
benchmarker.addCase("Enable_Per_SQL_Arg_Qualification") { _ =>
val (prefix, suffix) = inputArgs.splitAt(inputArgs.length - 1)
addCase("Enable_Per_SQL_Arg_Qualification") { _ =>
mainInternal(new QualificationArgs(prefix :+ "--per-sql" :+ "--num-threads"
:+ "1" :+ suffix.head),
enablePB = true)
}
benchmarker.addCase("Disable_Per_SQL_Arg_Qualification") { _ =>
addCase("Disable_Per_SQL_Arg_Qualification") { _ =>
mainInternal(new QualificationArgs(prefix :+ "--num-threads" :+ "1" :+ suffix.head),
enablePB = true)
}
benchmarker.run()
run()
}
}
}