From 5fa3fff8b0c624b947e980f34b5368ed7db96785 Mon Sep 17 00:00:00 2001
From: "Ahmed Hussein (amahussein)"
Date: Mon, 1 Jul 2024 12:31:54 -0500
Subject: [PATCH 01/26] Add Benchmarking to evaluate the core tools performance
Signed-off-by: Ahmed Hussein (amahussein)
---
.../rapids/tool/benchmarks/Benchmark.scala | 164 ++++++++++++++++++
.../tool/benchmarks/BenchmarkBase.scala | 72 ++++++++
.../rapids/tool/util/RuntimeReporter.scala | 50 ++++++
...eportGenerator.scala => RuntimeUtil.scala} | 60 +++----
.../sql/rapids/tool/util/ToolsTimer.scala | 44 +++++
5 files changed, 356 insertions(+), 34 deletions(-)
create mode 100644 core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
create mode 100644 core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
create mode 100644 core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReporter.scala
rename core/src/main/scala/org/apache/spark/sql/rapids/tool/util/{RuntimeReportGenerator.scala => RuntimeUtil.scala} (58%)
create mode 100644 core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsTimer.scala
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
new file mode 100644
index 000000000..4fff5d66d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -0,0 +1,164 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rapids.tool.benchmarks
+
+import java.io.{OutputStream, PrintStream}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.{DurationInt, FiniteDuration, NANOSECONDS}
+
+import org.apache.commons.io.output.TeeOutputStream
+
+import org.apache.spark.sql.rapids.tool.util.{RuntimeUtil, ToolsTimer}
+
+/**
+ * This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase
+ *
+ * Utility class to benchmark components. An example of how to use this is:
+ * val benchmark = new Benchmark("My Benchmark", valuesPerIteration)
+ * benchmark.addCase("V1")()
+ * benchmark.addCase("V2")()
+ * benchmark.run
+ * This will output the average time to run each function and the rate of each function.
+ */
+class Benchmark(
+ name: String,
+ valuesPerIteration: Long,
+ minNumIters: Int = 2,
+ warmupTime: FiniteDuration = 2.seconds,
+ minTime: FiniteDuration = 2.seconds,
+ outputPerIteration: Boolean = false,
+ output: Option[OutputStream] = None) {
+ import Benchmark._
+
+ val benchmarks: mutable.ArrayBuffer[Case] = mutable.ArrayBuffer.empty[Benchmark.Case]
+ val out: PrintStream = if (output.isDefined) {
+ new PrintStream(new TeeOutputStream(System.out, output.get))
+ } else {
+ System.out
+ }
+
+ /**
+ * Adds a case to run when run() is called. The given function will be run for several
+ * iterations to collect timing statistics.
+ *
+ * @param name of the benchmark case
+ * @param numIters if non-zero, forces exactly this many iterations to be run
+ */
+ def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
+ addTimerCase(name, numIters) { timer =>
+ timer.startTiming()
+ f(timer.iteration)
+ timer.stopTiming()
+ }
+ }
+
+ /**
+ * Adds a case with manual timing control. When the function is run, timing does not start
+ * until timer.startTiming() is called within the given function. The corresponding
+ * timer.stopTiming() method must be called before the function returns.
+ *
+ * @param name of the benchmark case
+ * @param numIters if non-zero, forces exactly this many iterations to be run
+ */
+ def addTimerCase(name: String, numIters: Int = 0)(f: ToolsTimer => Unit): Unit = {
+ benchmarks += Benchmark.Case(name, f, numIters)
+ }
+
+ /**
+ * Runs the benchmark and outputs the results to stdout. This should be copied and added as
+ * a comment with the benchmark. Although the results vary from machine to machine, it should
+ * provide some baseline.
+ */
+ def run(): Unit = {
+ require(benchmarks.nonEmpty)
+ // scalastyle:off
+ println("Running benchmark: " + name)
+ val results = benchmarks.map { c =>
+ println(" Running case: " + c.name)
+ measure(valuesPerIteration, c.numIters)(c.fn)
+ }
+ println
+
+ val firstBest = results.head.bestMs
+ // The results are going to be processor specific so it is useful to include that.
+ out.println(RuntimeUtil.getJVMOSInfo.mkString("\n"))
+ val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max))
+ out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
+ name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)", "Per Row(ns)", "Relative")
+ out.println("-" * (nameLen + 80))
+ results.zip(benchmarks).foreach { case (result, benchmark) =>
+ out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
+ benchmark.name,
+ "%5.0f" format result.bestMs,
+ "%4.0f" format result.avgMs,
+ "%5.0f" format result.stdevMs,
+ "%10.1f" format result.bestRate,
+ "%6.1f" format (1000 / result.bestRate),
+ "%3.1fX" format (firstBest / result.bestMs))
+ }
+ out.println()
+ }
+
+ /**
+ * Runs a single function `f` for iters, returning the average time the function took and
+ * the rate of the function.
+ */
+ def measure(num: Long, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = {
+ System.gc() // ensures garbage from previous cases don't impact this one
+ val warmupDeadline = warmupTime.fromNow
+ while (!warmupDeadline.isOverdue) {
+ f(new ToolsTimer(-1))
+ }
+ val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
+ val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
+ val runTimes = ArrayBuffer[Long]()
+ var totalTime = 0L
+ var i = 0
+ while (i < minIters || totalTime < minDuration) {
+ val timer = new ToolsTimer(i)
+ f(timer)
+ val runTime = timer.totalTime()
+ runTimes += runTime
+ totalTime += runTime
+
+ if (outputPerIteration) {
+ // scalastyle:off
+ println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds")
+ // scalastyle:on
+ }
+ i += 1
+ }
+ // scalastyle:off
+ println(s" Stopped after $i iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms")
+ // scalastyle:on
+ assert(runTimes.nonEmpty)
+ val best = runTimes.min
+ val avg = runTimes.sum / runTimes.size
+ val stdev = if (runTimes.size > 1) {
+ math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1))
+ } else 0
+ Benchmark.Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0)
+ }
+}
+
+
+object Benchmark {
+ case class Case(name: String, fn: ToolsTimer => Unit, numIters: Int)
+ case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double)
+}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
new file mode 100644
index 000000000..400cdedc7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rapids.tool.benchmarks
+
+import java.io.{File, FileOutputStream, OutputStream}
+
+/**
+ * This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase
+ *
+ * A base class for generate benchmark results to a file.
+ * For JDK9+, JDK major version number is added to the file names to distinguish the results.
+ */
+abstract class BenchmarkBase {
+ var output: Option[OutputStream] = None
+ /**
+ * Main process of the whole benchmark.
+ * Implementations of this method are supposed to use the wrapper method `runBenchmark`
+ * for each benchmark scenario.
+ */
+ def runBenchmarkSuite(mainArgs: Array[String]): Unit
+
+ final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
+ val separator = "=" * 96
+ val testHeader = (separator + '\n' + benchmarkName + '\n' + separator + '\n' + '\n').getBytes
+ output.foreach(_.write(testHeader))
+ func
+ output.foreach(_.write('\n'))
+ }
+ def prefix: String = "rapids-tools-benchmark"
+ def suffix: String = ""
+
+ /**
+ * Any shutdown code to ensure a clean shutdown
+ */
+ def afterAll(): Unit = {}
+
+ def main(args: Array[String]): Unit = {
+ // TODO: get the dirRoot from the arguments instead
+ val dirRoot = ""
+ val resultFileName = "results.txt"
+ val dir = new File(s"$dirRoot/$prefix/")
+ if (!dir.exists()) {
+ dir.mkdirs()
+ }
+ val file = new File(dir, resultFileName)
+ if (!file.exists()) {
+ file.createNewFile()
+ }
+ output = Some(new FileOutputStream(file))
+ runBenchmarkSuite(args)
+ output.foreach { o =>
+ if (o != null) {
+ o.close()
+ }
+ }
+ afterAll()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReporter.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReporter.scala
new file mode 100644
index 000000000..e0fb686fe
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReporter.scala
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.rapids.tool.util
+
+import com.nvidia.spark.rapids.tool.profiling.AppStatusResult
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.internal.Logging
+
+trait RuntimeReporter extends Logging {
+ val outputDir: String
+ def generateRuntimeReport(hadoopConf: Option[Configuration] = None): Unit = {
+ RuntimeUtil.generateReport(outputDir, hadoopConf)
+ }
+
+ /**
+ * For each app status report, generate an AppStatusResult.
+ * If appId is empty, convert to "N/A" in the output.
+ * @return Seq[AppStatusResult] - Seq[(path, status, appId, message)]
+ */
+ def generateStatusResults(appStatuses: Seq[AppResult]): Seq[AppStatusResult] = {
+ appStatuses.map {
+ case FailureAppResult(path, message) =>
+ AppStatusResult(path, "FAILURE", "N/A", message)
+ case SkippedAppResult(path, message) =>
+ AppStatusResult(path, "SKIPPED", "N/A", message)
+ case SuccessAppResult(path, appId, message) =>
+ AppStatusResult(path, "SUCCESS", appId, message)
+ case UnknownAppResult(path, appId, message) =>
+ val finalAppId = if (appId.isEmpty) "N/A" else appId
+ AppStatusResult(path, "UNKNOWN", finalAppId, message)
+ case profAppResult: AppResult =>
+ throw new UnsupportedOperationException(s"Invalid status for $profAppResult")
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReportGenerator.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala
similarity index 58%
rename from core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReportGenerator.scala
rename to core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala
index d0c31ea47..abe2b55cd 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeReportGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala
@@ -19,52 +19,31 @@ package org.apache.spark.sql.rapids.tool.util
import java.io.{PrintWriter, StringWriter}
import com.nvidia.spark.rapids.tool.ToolTextFileWriter
-import com.nvidia.spark.rapids.tool.profiling.AppStatusResult
import org.apache.hadoop.conf.Configuration
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.ToolUtils
-
-trait RuntimeReporter extends Logging {
- val outputDir: String
- def generateRuntimeReport(hadoopConf: Option[Configuration] = None): Unit = {
- RuntimeReportGenerator.generateReport(outputDir, hadoopConf)
- }
-
- /**
- * For each app status report, generate an AppStatusResult.
- * If appId is empty, convert to "N/A" in the output.
- * @return Seq[AppStatusResult] - Seq[(path, status, appId, message)]
- */
- def generateStatusResults(appStatuses: Seq[AppResult]): Seq[AppStatusResult] = {
- appStatuses.map {
- case FailureAppResult(path, message) =>
- AppStatusResult(path, "FAILURE", "N/A", message)
- case SkippedAppResult(path, message) =>
- AppStatusResult(path, "SKIPPED", "N/A", message)
- case SuccessAppResult(path, appId, message) =>
- AppStatusResult(path, "SUCCESS", appId, message)
- case UnknownAppResult(path, appId, message) =>
- val finalAppId = if (appId.isEmpty) "N/A" else appId
- AppStatusResult(path, "UNKNOWN", finalAppId, message)
- case profAppResult: AppResult =>
- throw new UnsupportedOperationException(s"Invalid status for $profAppResult")
- }
- }
-}
-
/**
- * Generates a file containing the properties of the build loaded.
+ * Utility class to pull information about the runtime system and the properties of the build
+ * loaded.
* In addition, it concatenates properties from the runtime (i.e., SparkVersion).
* It is expected that the list of properties in that file will grow depending on whether a
* property helps understanding and investigating the tools output.
- * @param outputDir the directory where the report is generated.
- * @param hadoopConf the hadoop configuration object used to access the HDFS if any.
*/
-object RuntimeReportGenerator extends Logging {
+object RuntimeUtil extends Logging {
private val REPORT_LABEL = "RAPIDS Accelerator for Apache Spark's Build/Runtime Information"
private val REPORT_FILE_NAME = "runtime.properties"
+
+ /**
+ * Generates a file containing the properties of the build loaded.
+ * In addition, it concatenates properties from the runtime (i.e., SparkVersion).
+ * It is expected that the list of properties in that file will grow depending on whether a
+ * property helps understanding and investigating the tools output.
+ *
+ * @param outputDir the directory where the report is generated.
+ * @param hadoopConf the hadoop configuration object used to access the HDFS if any.
+ */
def generateReport(outputDir: String, hadoopConf: Option[Configuration] = None): Unit = {
val buildProps = RapidsToolsConfUtil.loadBuildProperties
// Add the Spark version used in runtime.
@@ -81,6 +60,19 @@ object RuntimeReportGenerator extends Logging {
buildProps.list(new PrintWriter(writer))
logInfo(s"\n$REPORT_LABEL\n${writer.getBuffer.toString}")
}
+
+ /**
+ * Returns a map of the JVM and OS information.
+ * @return Map[String, String] - Map of the JVM and OS information.
+ */
+ def getJVMOSInfo: Map[String, String] = {
+ Map(
+ "jvm.name" -> System.getProperty("java.vm.name"),
+ "jvm.version" -> System.getProperty("java.vm.version"),
+ "os.name" -> System.getProperty("os.name"),
+ "os.version" -> System.getProperty("os.version")
+ )
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsTimer.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsTimer.scala
new file mode 100644
index 000000000..82ce62426
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsTimer.scala
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.rapids.tool.util
+
+/**
+ * This code is mostly copied from org.apache.spark.benchmark.Benchmark.Timer
+ *
+ * Utility class to measure timing.
+ * @param iteration specifies this is the nth iteration of running the benchmark case
+ */
+class ToolsTimer(val iteration: Int) {
+ private var accumulatedTime: Long = 0L
+ private var timeStart: Long = 0L
+
+ def startTiming(): Unit = {
+ assert(timeStart == 0L, "Already started timing.")
+ timeStart = System.nanoTime
+ }
+
+ def stopTiming(): Unit = {
+ assert(timeStart != 0L, "Have not started timing.")
+ accumulatedTime += System.nanoTime - timeStart
+ timeStart = 0L
+ }
+
+ def totalTime(): Long = {
+ assert(timeStart == 0L, "Have not stopped timing.")
+ accumulatedTime
+ }
+}
From cf099b62e96d507ae3681fde0bbd2e0d59e33ad7 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Tue, 2 Jul 2024 10:05:47 -0500
Subject: [PATCH 02/26] Adding changes for argument parsing
Signed-off-by: Sayed Bilal Bari
---
.../tool/benchmarks/BenchmarkArgs.scala | 29 +++++++++++++++++++
.../tool/benchmarks/BenchmarkBase.scala | 14 +++++++--
2 files changed, 40 insertions(+), 3 deletions(-)
create mode 100644 core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
new file mode 100644
index 000000000..98c6bf26d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
@@ -0,0 +1,29 @@
+package org.apache.spark.rapids.tool.benchmarks
+
+import org.rogach.scallop.{ScallopConf, ScallopOption}
+
+class BenchmarkArgs(arguments: Seq[String]) extends ScallopConf(arguments) {
+
+ banner("""
+Benchmarker class for running various benchmarks.
+ """)
+
+ val iterations: ScallopOption[Int] = opt[Int](name = "iterations", short = 'i', default = Some(5),
+ descr = "Total iterations to run")
+
+ val warmupIterations: ScallopOption[Int] = opt[Int](name = "warmupIterations", short = 'w' ,
+ default = Some(3), descr = "Number of warmup iterations to run")
+
+ val outputDirectory: ScallopOption[String] = opt[String](name = "outputDirectory", short = 'd',
+ default = Some("."), descr = "Directory to write output to")
+
+ val outputFormat: ScallopOption[String] = opt[String](name = "outputFormat", short = 'o',
+ default = Some("tbl"), descr = "Format of output ( tbl, json)")
+
+ val extraArgs: ScallopOption[String] = opt[String](name = "extraArgs" , short = 'a',
+ required = false,
+ descr = "Extra arguments to pass to the benchmark")
+
+ verify()
+
+}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
index 400cdedc7..7186eec7c 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
@@ -31,7 +31,10 @@ abstract class BenchmarkBase {
* Implementations of this method are supposed to use the wrapper method `runBenchmark`
* for each benchmark scenario.
*/
- def runBenchmarkSuite(mainArgs: Array[String]): Unit
+ def runBenchmarkSuite(iterations: Int,
+ warmUpIterations: Int,
+ outputFormat: String,
+ mainArgs: Array[String]): Unit
final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
val separator = "=" * 96
@@ -49,8 +52,10 @@ abstract class BenchmarkBase {
def afterAll(): Unit = {}
def main(args: Array[String]): Unit = {
+
+ val conf = new BenchmarkArgs(args)
// TODO: get the dirRoot from the arguments instead
- val dirRoot = ""
+ val dirRoot = conf.outputDirectory().stripSuffix("/")
val resultFileName = "results.txt"
val dir = new File(s"$dirRoot/$prefix/")
if (!dir.exists()) {
@@ -61,7 +66,10 @@ abstract class BenchmarkBase {
file.createNewFile()
}
output = Some(new FileOutputStream(file))
- runBenchmarkSuite(args)
+ runBenchmarkSuite(conf.iterations(),
+ conf.warmupIterations(),
+ conf.outputFormat(),
+ conf.extraArgs().split("\\s+").filter(_.nonEmpty))
output.foreach { o =>
if (o != null) {
o.close()
From 1f5155d97936316672574426c63c222acd484077 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Tue, 2 Jul 2024 10:11:52 -0500
Subject: [PATCH 03/26] changing warmup time to warmup iterations
Signed-off-by: Sayed Bilal Bari
---
.../spark/rapids/tool/benchmarks/Benchmark.scala | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
index 4fff5d66d..f08486025 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -39,9 +39,8 @@ import org.apache.spark.sql.rapids.tool.util.{RuntimeUtil, ToolsTimer}
class Benchmark(
name: String,
valuesPerIteration: Long,
- minNumIters: Int = 2,
- warmupTime: FiniteDuration = 2.seconds,
- minTime: FiniteDuration = 2.seconds,
+ minNumIters: Int,
+ warmUpIterations: Int,
outputPerIteration: Boolean = false,
output: Option[OutputStream] = None) {
import Benchmark._
@@ -121,16 +120,16 @@ class Benchmark(
*/
def measure(num: Long, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = {
System.gc() // ensures garbage from previous cases don't impact this one
- val warmupDeadline = warmupTime.fromNow
- while (!warmupDeadline.isOverdue) {
+ var wi = 0
+ while (wi < warmUpIterations) {
f(new ToolsTimer(-1))
+ wi += 1
}
val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
- val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
val runTimes = ArrayBuffer[Long]()
var totalTime = 0L
var i = 0
- while (i < minIters || totalTime < minDuration) {
+ while (i < minIters) {
val timer = new ToolsTimer(i)
f(timer)
val runTime = timer.totalTime()
From ad4e562fd67672ce154d375e363e5c189caa9942 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Tue, 2 Jul 2024 10:17:20 -0500
Subject: [PATCH 04/26] Removing unsed imports
Signed-off-by: Sayed Bilal Bari
---
.../org/apache/spark/rapids/tool/benchmarks/Benchmark.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
index f08486025..2b666525a 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -20,7 +20,7 @@ import java.io.{OutputStream, PrintStream}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration.{DurationInt, FiniteDuration, NANOSECONDS}
+import scala.concurrent.duration.NANOSECONDS
import org.apache.commons.io.output.TeeOutputStream
From 303ee60ef2c620cb20a440db60be5e329dc5a0e3 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Tue, 2 Jul 2024 10:32:53 -0500
Subject: [PATCH 05/26] Adding Qualification Benchmark
Signed-off-by: Sayed Bilal Bari
---
.../benchmarks/QualificationBenchmark.scala | 27 +++++++++++++++++++
1 file changed, 27 insertions(+)
create mode 100644 core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
new file mode 100644
index 000000000..b5807d355
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
@@ -0,0 +1,27 @@
+package org.apache.spark.rapids.tool.benchmarks
+
+import com.nvidia.spark.rapids.tool.qualification.QualificationArgs
+import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal
+
+object QualificationBenchmark extends BenchmarkBase {
+ override def runBenchmarkSuite(iterations: Int,
+ warmUpIterations: Int,
+ outputFormat: String,
+ mainArgs: Array[String]): Unit = {
+ runBenchmark("QualificationBenchmark") {
+ val benchmarker =
+ new Benchmark(
+ "QualificationBenchmark",
+ 2,
+ output = output,
+ outputPerIteration = true,
+ warmUpIterations = warmUpIterations,
+ minNumIters = iterations)
+ benchmarker.addCase("QualificationBenchmark") { _ =>
+ mainInternal(new QualificationArgs(mainArgs),
+ printStdout = true, enablePB = true)
+ }
+ benchmarker.run()
+ }
+ }
+}
From 61370218a2a6da9f89c68091b6c65f52591dc92c Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Tue, 2 Jul 2024 13:12:05 -0500
Subject: [PATCH 06/26] Changes for review comments
Signed-off-by: Sayed Bilal Bari
---
.../rapids/tool/benchmarks/Benchmark.scala | 31 ++++++-------
.../tool/benchmarks/BenchmarkArgs.scala | 44 ++++++++++++++-----
.../tool/benchmarks/BenchmarkBase.scala | 1 -
.../benchmarks/QualificationBenchmark.scala | 24 ++++++++++
4 files changed, 72 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
index 2b666525a..f4359c4a6 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -87,9 +87,12 @@ class Benchmark(
def run(): Unit = {
require(benchmarks.nonEmpty)
// scalastyle:off
+ println("-" * 80)
println("Running benchmark: " + name)
+ println("-" * 80)
val results = benchmarks.map { c =>
- println(" Running case: " + c.name)
+ println(" RUNNING CASE : " + c.name)
+ println("-" * 80)
measure(valuesPerIteration, c.numIters)(c.fn)
}
println
@@ -98,17 +101,15 @@ class Benchmark(
// The results are going to be processor specific so it is useful to include that.
out.println(RuntimeUtil.getJVMOSInfo.mkString("\n"))
val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max))
- out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
- name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)", "Per Row(ns)", "Relative")
+ out.printf(s"%-${nameLen}s %14s %14s %11s %10s\n",
+ name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Relative")
out.println("-" * (nameLen + 80))
results.zip(benchmarks).foreach { case (result, benchmark) =>
- out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
+ out.printf(s"%-${nameLen}s %14s %14s %11s %10s\n",
benchmark.name,
"%5.0f" format result.bestMs,
"%4.0f" format result.avgMs,
"%5.0f" format result.stdevMs,
- "%10.1f" format result.bestRate,
- "%6.1f" format (1000 / result.bestRate),
"%3.1fX" format (firstBest / result.bestMs))
}
out.println()
@@ -120,16 +121,13 @@ class Benchmark(
*/
def measure(num: Long, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = {
System.gc() // ensures garbage from previous cases don't impact this one
- var wi = 0
- while (wi < warmUpIterations) {
+ for (wi <- 0 until warmUpIterations) {
f(new ToolsTimer(-1))
- wi += 1
}
val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
val runTimes = ArrayBuffer[Long]()
var totalTime = 0L
- var i = 0
- while (i < minIters) {
+ for (i <- 0 until minIters) {
val timer = new ToolsTimer(i)
f(timer)
val runTime = timer.totalTime()
@@ -138,13 +136,16 @@ class Benchmark(
if (outputPerIteration) {
// scalastyle:off
+ println("*"*80)
println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds")
+ println("*"*80)
// scalastyle:on
}
- i += 1
}
// scalastyle:off
- println(s" Stopped after $i iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms")
+ println("*"*80)
+ println(s" Stopped after $minIters iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms")
+ println("*"*80)
// scalastyle:on
assert(runTimes.nonEmpty)
val best = runTimes.min
@@ -152,12 +153,12 @@ class Benchmark(
val stdev = if (runTimes.size > 1) {
math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1))
} else 0
- Benchmark.Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0)
+ Benchmark.Result(avg / 1000000.0, best / 1000000.0, stdev / 1000000.0)
}
}
object Benchmark {
case class Case(name: String, fn: ToolsTimer => Unit, numIters: Int)
- case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double)
+ case class Result(avgMs: Double, bestMs: Double, stdevMs: Double)
}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
index 98c6bf26d..ca18e3b3e 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
@@ -1,3 +1,19 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.rapids.tool.benchmarks
import org.rogach.scallop.{ScallopConf, ScallopOption}
@@ -9,21 +25,25 @@ Benchmarker class for running various benchmarks.
""")
val iterations: ScallopOption[Int] = opt[Int](name = "iterations", short = 'i', default = Some(5),
- descr = "Total iterations to run")
-
+ descr = "Total iterations to run excluding warmup (for avg time calculation)." +
+ " Default is 5 iterations", validate = _ > 0)
val warmupIterations: ScallopOption[Int] = opt[Int](name = "warmupIterations", short = 'w' ,
- default = Some(3), descr = "Number of warmup iterations to run")
-
- val outputDirectory: ScallopOption[String] = opt[String](name = "outputDirectory", short = 'd',
- default = Some("."), descr = "Directory to write output to")
-
- val outputFormat: ScallopOption[String] = opt[String](name = "outputFormat", short = 'o',
- default = Some("tbl"), descr = "Format of output ( tbl, json)")
-
+ default = Some(3), descr = "Total number of warmup iterations to run. Can take " +
+ "any input >=0. Warm up is important for benchmarking to ensure initial " +
+ "JVM operations do not skew the result ( classloading etc. )", validate = _ >= 0)
+ val outputDirectory: ScallopOption[String] = opt[String](name = "outputDirectory", short = 'o',
+ default = Some("."), descr = "Base output directory for benchmark results. " +
+ "Default is current directory. The final output will go into a subdirectory called" +
+ " rapids-tools-benchmark. It will override any directory with the same name")
+ val outputFormat: ScallopOption[String] = opt[String](name = "outputFormat", short = 'f',
+ default = Some("text"), descr = "Output format for the benchmark results. For text" +
+ " the result output will be tabular. In case of json , the results" +
+ "will be JSON formatted. Currently supported formats are text, json")
val extraArgs: ScallopOption[String] = opt[String](name = "extraArgs" , short = 'a',
required = false,
- descr = "Extra arguments to pass to the benchmark")
-
+ descr = "Extra arguments to pass to the benchmark.These arguments will be sent to the " +
+ "benchmark class. The format is space separated arguments. For example " +
+ "--output-directory /tmp --per-sql /tmp/eventlogs")
verify()
}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
index 7186eec7c..02adf5299 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
@@ -54,7 +54,6 @@ abstract class BenchmarkBase {
def main(args: Array[String]): Unit = {
val conf = new BenchmarkArgs(args)
- // TODO: get the dirRoot from the arguments instead
val dirRoot = conf.outputDirectory().stripSuffix("/")
val resultFileName = "results.txt"
val dir = new File(s"$dirRoot/$prefix/")
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
index b5807d355..b64098f64 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
@@ -1,8 +1,32 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.rapids.tool.benchmarks
import com.nvidia.spark.rapids.tool.qualification.QualificationArgs
import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal
+/*
+ * This class is used to run the QualificationMain class as a benchmark.
+ * This can be used as a reference to write any benchmark class
+ * Usage -
+ * 1. Override the runBenchmarkSuite method
+ * 2. Write the benchmark code in the runBenchmark method passing relevant arguments
+ * 3. Write benchmarked code inside
+ */
object QualificationBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
From 4f85b8da797bf60577f78b1118e3e26b2000be97 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Tue, 2 Jul 2024 15:35:53 -0500
Subject: [PATCH 07/26] Changes for review comments
Signed-off-by: Sayed Bilal Bari
---
.../rapids/tool/benchmarks/BenchmarkArgs.scala | 1 -
.../rapids/tool/benchmarks/BenchmarkBase.scala | 12 ++++++------
.../tool/benchmarks/QualificationBenchmark.scala | 16 ++++++++--------
3 files changed, 14 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
index ca18e3b3e..204b72b7f 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
@@ -45,5 +45,4 @@ Benchmarker class for running various benchmarks.
"benchmark class. The format is space separated arguments. For example " +
"--output-directory /tmp --per-sql /tmp/eventlogs")
verify()
-
}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
index 02adf5299..a2e52cd70 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
@@ -53,8 +53,8 @@ abstract class BenchmarkBase {
def main(args: Array[String]): Unit = {
- val conf = new BenchmarkArgs(args)
- val dirRoot = conf.outputDirectory().stripSuffix("/")
+ val benchArgs = new BenchmarkArgs(args)
+ val dirRoot = benchArgs.outputDirectory().stripSuffix("/")
val resultFileName = "results.txt"
val dir = new File(s"$dirRoot/$prefix/")
if (!dir.exists()) {
@@ -65,10 +65,10 @@ abstract class BenchmarkBase {
file.createNewFile()
}
output = Some(new FileOutputStream(file))
- runBenchmarkSuite(conf.iterations(),
- conf.warmupIterations(),
- conf.outputFormat(),
- conf.extraArgs().split("\\s+").filter(_.nonEmpty))
+ runBenchmarkSuite(benchArgs.iterations(),
+ benchArgs.warmupIterations(),
+ benchArgs.outputFormat(),
+ benchArgs.extraArgs().split("\\s+").filter(_.nonEmpty))
output.foreach { o =>
if (o != null) {
o.close()
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
index b64098f64..17712739f 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
@@ -19,14 +19,14 @@ package org.apache.spark.rapids.tool.benchmarks
import com.nvidia.spark.rapids.tool.qualification.QualificationArgs
import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal
-/*
- * This class is used to run the QualificationMain class as a benchmark.
- * This can be used as a reference to write any benchmark class
- * Usage -
- * 1. Override the runBenchmarkSuite method
- * 2. Write the benchmark code in the runBenchmark method passing relevant arguments
- * 3. Write benchmarked code inside
- */
+/**
+ * This class is used to run the QualificationMain class as a benchmark.
+ * This can be used as a reference to write any benchmark class
+ * Usage -
+ * 1. Override the runBenchmarkSuite method
+ * 2. Write the benchmark code in the runBenchmark method passing relevant arguments
+ * 3. Write benchmarked code inside
+ */
object QualificationBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
From 0131c71f094b1721e8888fe3bd9144aa1ef11c0c Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Tue, 2 Jul 2024 15:40:09 -0500
Subject: [PATCH 08/26] removing name param from scallop options
Signed-off-by: Sayed Bilal Bari
---
.../spark/rapids/tool/benchmarks/BenchmarkArgs.scala | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
index 204b72b7f..a7c0e99ff 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
@@ -24,22 +24,22 @@ class BenchmarkArgs(arguments: Seq[String]) extends ScallopConf(arguments) {
Benchmarker class for running various benchmarks.
""")
- val iterations: ScallopOption[Int] = opt[Int](name = "iterations", short = 'i', default = Some(5),
+ val iterations: ScallopOption[Int] = opt[Int](short = 'i', default = Some(5),
descr = "Total iterations to run excluding warmup (for avg time calculation)." +
" Default is 5 iterations", validate = _ > 0)
- val warmupIterations: ScallopOption[Int] = opt[Int](name = "warmupIterations", short = 'w' ,
+ val warmupIterations: ScallopOption[Int] = opt[Int](short = 'w' ,
default = Some(3), descr = "Total number of warmup iterations to run. Can take " +
"any input >=0. Warm up is important for benchmarking to ensure initial " +
"JVM operations do not skew the result ( classloading etc. )", validate = _ >= 0)
- val outputDirectory: ScallopOption[String] = opt[String](name = "outputDirectory", short = 'o',
+ val outputDirectory: ScallopOption[String] = opt[String](short = 'o',
default = Some("."), descr = "Base output directory for benchmark results. " +
"Default is current directory. The final output will go into a subdirectory called" +
" rapids-tools-benchmark. It will override any directory with the same name")
- val outputFormat: ScallopOption[String] = opt[String](name = "outputFormat", short = 'f',
+ val outputFormat: ScallopOption[String] = opt[String](short = 'f',
default = Some("text"), descr = "Output format for the benchmark results. For text" +
" the result output will be tabular. In case of json , the results" +
"will be JSON formatted. Currently supported formats are text, json")
- val extraArgs: ScallopOption[String] = opt[String](name = "extraArgs" , short = 'a',
+ val extraArgs: ScallopOption[String] = opt[String](short = 'a',
required = false,
descr = "Extra arguments to pass to the benchmark.These arguments will be sent to the " +
"benchmark class. The format is space separated arguments. For example " +
From 1026e697b569d99e80ee52948d34b90cdd29f62f Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari <34418972+bilalbari@users.noreply.github.com>
Date: Fri, 5 Jul 2024 14:26:09 -0500
Subject: [PATCH 09/26] Adding GC Metrics (#12)
* Adding GC Metrics
Signed-off-by: Sayed Bilal Bari
* Review comment changes
Signed-off-by: Sayed Bilal Bari
* Correcting output format + refactoring
Signed-off-by: Sayed Bilal Bari
* Output Formatting Changes
Signed-off-by: Sayed Bilal Bari
* Formatting + Making qual bench single threaded
Signed-off-by: Sayed Bilal Bari
---------
Signed-off-by: Sayed Bilal Bari
Co-authored-by: Sayed Bilal Bari
---
.../rapids/tool/benchmarks/Benchmark.scala | 69 ++++++++++++++-----
... => SingleThreadedQualToolBenchmark.scala} | 19 +++--
.../tool/util/MemoryMetricsTracker.scala | 32 +++++++++
.../sql/rapids/tool/util/RuntimeUtil.scala | 2 +-
4 files changed, 96 insertions(+), 26 deletions(-)
rename core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/{QualificationBenchmark.scala => SingleThreadedQualToolBenchmark.scala} (70%)
create mode 100644 core/src/main/scala/org/apache/spark/sql/rapids/tool/util/MemoryMetricsTracker.scala
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
index f4359c4a6..40dca59a9 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -24,7 +24,7 @@ import scala.concurrent.duration.NANOSECONDS
import org.apache.commons.io.output.TeeOutputStream
-import org.apache.spark.sql.rapids.tool.util.{RuntimeUtil, ToolsTimer}
+import org.apache.spark.sql.rapids.tool.util.{MemoryMetricsTracker, RuntimeUtil, ToolsTimer}
/**
* This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase
@@ -37,7 +37,7 @@ import org.apache.spark.sql.rapids.tool.util.{RuntimeUtil, ToolsTimer}
* This will output the average time to run each function and the rate of each function.
*/
class Benchmark(
- name: String,
+ name: String = "Benchmarker",
valuesPerIteration: Long,
minNumIters: Int,
warmUpIterations: Int,
@@ -99,18 +99,31 @@ class Benchmark(
val firstBest = results.head.bestMs
// The results are going to be processor specific so it is useful to include that.
- out.println(RuntimeUtil.getJVMOSInfo.mkString("\n"))
+ val jvmInfo = RuntimeUtil.getJVMOSInfo
+ out.printf(s"%-26s : %s \n","JVM Name", jvmInfo("jvm.name"))
+ out.printf(s"%-26s : %s \n","Java Version", jvmInfo("jvm.version"))
+ out.printf(s"%-26s : %s \n","OS Name", jvmInfo("os.name"))
+ out.printf(s"%-26s : %s \n","OS Version", jvmInfo("os.version"))
+ out.printf(s"%-26s : %s MB \n","MaxHeapMemory", (Runtime.getRuntime.maxMemory()/1024/1024).toString)
+ out.printf(s"%-26s : %s \n","Total Warm Up Iterations", warmUpIterations.toString)
+ out.printf(s"%-26s : %s \n \n","Total Runtime Iterations", minNumIters.toString)
val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max))
- out.printf(s"%-${nameLen}s %14s %14s %11s %10s\n",
- name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Relative")
- out.println("-" * (nameLen + 80))
+ out.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
+ name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)","Avg GC Time(ms)",
+ "Avg GC Count", "Stdev GC Count","Max GC Time(ms)","Max GC Count", "Relative")
+ out.println("-" * (nameLen + 160))
results.zip(benchmarks).foreach { case (result, benchmark) =>
- out.printf(s"%-${nameLen}s %14s %14s %11s %10s\n",
+ out.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
benchmark.name,
"%5.0f" format result.bestMs,
"%4.0f" format result.avgMs,
"%5.0f" format result.stdevMs,
- "%3.1fX" format (firstBest / result.bestMs))
+ "%5.1f" format result.memoryParams.avgGCTime,
+ "%5.1f" format result.memoryParams.avgGCCount,
+ "%5.0f" format result.memoryParams.stdDevGCCount,
+ "%5d" format result.memoryParams.maxGcTime,
+ "%5d" format result.memoryParams.maxGCCount,
+ "%3.2fX" format (firstBest / result.bestMs))
}
out.println()
}
@@ -126,14 +139,17 @@ class Benchmark(
}
val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
val runTimes = ArrayBuffer[Long]()
- var totalTime = 0L
+ val gcCounts = ArrayBuffer[Long]()
+ val gcTimes = ArrayBuffer[Long]()
+ //For tracking maximum GC over iterations
for (i <- 0 until minIters) {
val timer = new ToolsTimer(i)
+ val memoryTracker = new MemoryMetricsTracker
f(timer)
val runTime = timer.totalTime()
runTimes += runTime
- totalTime += runTime
-
+ gcCounts += memoryTracker.getTotalGCCount
+ gcTimes += memoryTracker.getTotalGCTime
if (outputPerIteration) {
// scalastyle:off
println("*"*80)
@@ -148,17 +164,34 @@ class Benchmark(
println("*"*80)
// scalastyle:on
assert(runTimes.nonEmpty)
- val best = runTimes.min
- val avg = runTimes.sum / runTimes.size
- val stdev = if (runTimes.size > 1) {
- math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1))
- } else 0
- Benchmark.Result(avg / 1000000.0, best / 1000000.0, stdev / 1000000.0)
+ val bestRuntime = runTimes.min
+ val avgRuntime = runTimes.sum / runTimes.size
+ val stdevRunTime = if (runTimes.size > 1) {
+ math.sqrt(runTimes.map(time => (time - avgRuntime) *
+ (time - avgRuntime)).sum / (runTimes.size - 1))
+ } else {
+ 0
+ }
+ val maxGcCount = gcCounts.max
+ val stdevGcCount = if (gcCounts.size > 1) {
+ math.sqrt(gcCounts.map(gc => (gc - maxGcCount) *
+ (gc - maxGcCount)).sum / (gcCounts.size - 1))
+ } else {
+ 0
+ }
+ val avgGcCount = gcCounts.sum / minIters
+ val avgGcTime = gcTimes.sum / minIters
+ val maxGcTime = gcTimes.max
+ Benchmark.Result(avgRuntime / 1000000.0, bestRuntime / 1000000.0, stdevRunTime / 1000000.0,
+ JVMMemoryParams(avgGcTime, avgGcCount, stdevGcCount, maxGcCount, maxGcTime))
}
}
object Benchmark {
case class Case(name: String, fn: ToolsTimer => Unit, numIters: Int)
- case class Result(avgMs: Double, bestMs: Double, stdevMs: Double)
+ case class JVMMemoryParams( avgGCTime:Double, avgGCCount:Double,
+ stdDevGCCount: Double, maxGCCount: Long, maxGcTime:Long)
+ case class Result(avgMs: Double, bestMs: Double, stdevMs: Double,
+ memoryParams: JVMMemoryParams)
}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
similarity index 70%
rename from core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
rename to core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
index 17712739f..12b3f3c6d 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/QualificationBenchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
@@ -27,23 +27,28 @@ import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal
* 2. Write the benchmark code in the runBenchmark method passing relevant arguments
* 3. Write benchmarked code inside
*/
-object QualificationBenchmark extends BenchmarkBase {
+object SingleThreadedQualToolBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
outputFormat: String,
mainArgs: Array[String]): Unit = {
- runBenchmark("QualificationBenchmark") {
+ runBenchmark("Benchmark_Per_SQL_Arg_Qualification") {
val benchmarker =
new Benchmark(
- "QualificationBenchmark",
- 2,
+ valuesPerIteration = 2,
output = output,
outputPerIteration = true,
warmUpIterations = warmUpIterations,
minNumIters = iterations)
- benchmarker.addCase("QualificationBenchmark") { _ =>
- mainInternal(new QualificationArgs(mainArgs),
- printStdout = true, enablePB = true)
+ val (prefix,suffix) = mainArgs.splitAt(mainArgs.length - 1)
+ benchmarker.addCase("Enable_Per_SQL_Arg_Qualification") { _ =>
+ mainInternal(new QualificationArgs(prefix :+ "--per-sql" :+ "--num-threads"
+ :+ "1" :+ suffix.head),
+ enablePB = true)
+ }
+ benchmarker.addCase("Disable_Per_SQL_Arg_Qualification") { _ =>
+ mainInternal(new QualificationArgs(prefix :+ "--num-threads" :+ "1" :+ suffix.head),
+ enablePB = true)
}
benchmarker.run()
}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/MemoryMetricsTracker.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/MemoryMetricsTracker.scala
new file mode 100644
index 000000000..c796e413c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/MemoryMetricsTracker.scala
@@ -0,0 +1,32 @@
+package org.apache.spark.sql.rapids.tool.util
+
+import java.lang.management.ManagementFactory
+
+import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
+
+/**
+ * Utility class to track memory metrics.
+ * This class is used to track memory metrics such as GC count, GC time,
+ * heap memory usage, etc.
+ *
+ */
+class MemoryMetricsTracker {
+ private val startGCMetrics = getCurrentGCMetrics
+
+ private def getCurrentGCMetrics: (Long, Long) = {
+ val gcBeans = ManagementFactory.getGarbageCollectorMXBeans
+
+ (gcBeans.map(_.getCollectionCount).sum,
+ gcBeans.map(_.getCollectionTime).sum)
+ }
+
+ def getTotalGCCount: Long = {
+ val (newGcCount:Long, _) = getCurrentGCMetrics
+ newGcCount - startGCMetrics._1
+ }
+
+ def getTotalGCTime: Long = {
+ val (_, newGcTime:Long) = getCurrentGCMetrics
+ newGcTime - startGCMetrics._2
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala
index abe2b55cd..d39323c7a 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/RuntimeUtil.scala
@@ -68,7 +68,7 @@ object RuntimeUtil extends Logging {
def getJVMOSInfo: Map[String, String] = {
Map(
"jvm.name" -> System.getProperty("java.vm.name"),
- "jvm.version" -> System.getProperty("java.vm.version"),
+ "jvm.version" -> System.getProperty("java.version"),
"os.name" -> System.getProperty("os.name"),
"os.version" -> System.getProperty("os.version")
)
From d9a8be1fe4903636bdcda425b1bf1e0be285dd62 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Tue, 9 Jul 2024 10:03:29 -0500
Subject: [PATCH 10/26] Review changes
Signed-off-by: Sayed Bilal Bari
---
.../apache/spark/rapids/tool/benchmarks/Benchmark.scala | 6 ------
.../spark/rapids/tool/benchmarks/BenchmarkBase.scala | 7 ++++---
.../tool/benchmarks/SingleThreadedQualToolBenchmark.scala | 4 ++--
3 files changed, 6 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
index 40dca59a9..29e9412d4 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -86,7 +86,6 @@ class Benchmark(
*/
def run(): Unit = {
require(benchmarks.nonEmpty)
- // scalastyle:off
println("-" * 80)
println("Running benchmark: " + name)
println("-" * 80)
@@ -98,7 +97,6 @@ class Benchmark(
println
val firstBest = results.head.bestMs
- // The results are going to be processor specific so it is useful to include that.
val jvmInfo = RuntimeUtil.getJVMOSInfo
out.printf(s"%-26s : %s \n","JVM Name", jvmInfo("jvm.name"))
out.printf(s"%-26s : %s \n","Java Version", jvmInfo("jvm.version"))
@@ -151,18 +149,14 @@ class Benchmark(
gcCounts += memoryTracker.getTotalGCCount
gcTimes += memoryTracker.getTotalGCTime
if (outputPerIteration) {
- // scalastyle:off
println("*"*80)
println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds")
println("*"*80)
- // scalastyle:on
}
}
- // scalastyle:off
println("*"*80)
println(s" Stopped after $minIters iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms")
println("*"*80)
- // scalastyle:on
assert(runTimes.nonEmpty)
val bestRuntime = runTimes.min
val avgRuntime = runTimes.sum / runTimes.size
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
index a2e52cd70..c8f09e1b8 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
@@ -21,8 +21,9 @@ import java.io.{File, FileOutputStream, OutputStream}
/**
* This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase
*
- * A base class for generate benchmark results to a file.
- * For JDK9+, JDK major version number is added to the file names to distinguish the results.
+ * A base class for generating benchmark results to a file.
+ * For JDK9+, JDK major version number is added to the result file to distinguish
+ * the generated results.
*/
abstract class BenchmarkBase {
var output: Option[OutputStream] = None
@@ -34,7 +35,7 @@ abstract class BenchmarkBase {
def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
outputFormat: String,
- mainArgs: Array[String]): Unit
+ extraArgs: Array[String]): Unit
final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
val separator = "=" * 96
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
index 12b3f3c6d..7b39135a9 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
@@ -31,7 +31,7 @@ object SingleThreadedQualToolBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
outputFormat: String,
- mainArgs: Array[String]): Unit = {
+ extraArgs: Array[String]): Unit = {
runBenchmark("Benchmark_Per_SQL_Arg_Qualification") {
val benchmarker =
new Benchmark(
@@ -40,7 +40,7 @@ object SingleThreadedQualToolBenchmark extends BenchmarkBase {
outputPerIteration = true,
warmUpIterations = warmUpIterations,
minNumIters = iterations)
- val (prefix,suffix) = mainArgs.splitAt(mainArgs.length - 1)
+ val (prefix, suffix) = extraArgs.splitAt(extraArgs.length - 1)
benchmarker.addCase("Enable_Per_SQL_Arg_Qualification") { _ =>
mainInternal(new QualificationArgs(prefix :+ "--per-sql" :+ "--num-threads"
:+ "1" :+ suffix.head),
From 9033c821c2d3ec03a9f32689d8d79305050e4453 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Tue, 9 Jul 2024 10:14:08 -0500
Subject: [PATCH 11/26] Correcting scalastyle failure
Signed-off-by: Sayed Bilal Bari
---
.../org/apache/spark/rapids/tool/benchmarks/Benchmark.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
index 29e9412d4..e59218e8c 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -102,7 +102,8 @@ class Benchmark(
out.printf(s"%-26s : %s \n","Java Version", jvmInfo("jvm.version"))
out.printf(s"%-26s : %s \n","OS Name", jvmInfo("os.name"))
out.printf(s"%-26s : %s \n","OS Version", jvmInfo("os.version"))
- out.printf(s"%-26s : %s MB \n","MaxHeapMemory", (Runtime.getRuntime.maxMemory()/1024/1024).toString)
+ out.printf(s"%-26s : %s MB \n","MaxHeapMemory",
+ (Runtime.getRuntime.maxMemory()/1024/1024).toString)
out.printf(s"%-26s : %s \n","Total Warm Up Iterations", warmUpIterations.toString)
out.printf(s"%-26s : %s \n \n","Total Runtime Iterations", minNumIters.toString)
val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max))
From 9199881a28eb88f82f961e81ef9022006041c4d8 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Wed, 10 Jul 2024 11:21:31 -0500
Subject: [PATCH 12/26] Correcting passed argument name for semantic clarity
Signed-off-by: Sayed Bilal Bari
---
.../spark/rapids/tool/benchmarks/BenchmarkArgs.scala | 6 +++---
.../spark/rapids/tool/benchmarks/BenchmarkBase.scala | 4 ++--
.../tool/benchmarks/SingleThreadedQualToolBenchmark.scala | 7 +++++--
3 files changed, 10 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
index a7c0e99ff..03e115660 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
@@ -39,10 +39,10 @@ Benchmarker class for running various benchmarks.
default = Some("text"), descr = "Output format for the benchmark results. For text" +
" the result output will be tabular. In case of json , the results" +
"will be JSON formatted. Currently supported formats are text, json")
- val extraArgs: ScallopOption[String] = opt[String](short = 'a',
+ val inputArgs: ScallopOption[String] = opt[String](short = 'a',
required = false,
- descr = "Extra arguments to pass to the benchmark.These arguments will be sent to the " +
- "benchmark class. The format is space separated arguments. For example " +
+ descr = "Input arguments to pass to the benchmark suite. The usage is relevant in cases of " +
+ "common arguments across benchmark. The format is space separated arguments. For example " +
"--output-directory /tmp --per-sql /tmp/eventlogs")
verify()
}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
index c8f09e1b8..36bf9d42c 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
@@ -35,7 +35,7 @@ abstract class BenchmarkBase {
def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
outputFormat: String,
- extraArgs: Array[String]): Unit
+ inputArgs: Array[String]): Unit
final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
val separator = "=" * 96
@@ -69,7 +69,7 @@ abstract class BenchmarkBase {
runBenchmarkSuite(benchArgs.iterations(),
benchArgs.warmupIterations(),
benchArgs.outputFormat(),
- benchArgs.extraArgs().split("\\s+").filter(_.nonEmpty))
+ benchArgs.inputArgs().split("\\s+").filter(_.nonEmpty))
output.foreach { o =>
if (o != null) {
o.close()
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
index 7b39135a9..e28e36ec4 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
@@ -31,7 +31,10 @@ object SingleThreadedQualToolBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(iterations: Int,
warmUpIterations: Int,
outputFormat: String,
- extraArgs: Array[String]): Unit = {
+ inputArgs: Array[String]): Unit = {
+ // Currently the input arguments are assumed to be common across cases
+ // This will be improved in a follow up PR to enable passing as a config
+ // file with argument support for different cases
runBenchmark("Benchmark_Per_SQL_Arg_Qualification") {
val benchmarker =
new Benchmark(
@@ -40,7 +43,7 @@ object SingleThreadedQualToolBenchmark extends BenchmarkBase {
outputPerIteration = true,
warmUpIterations = warmUpIterations,
minNumIters = iterations)
- val (prefix, suffix) = extraArgs.splitAt(extraArgs.length - 1)
+ val (prefix, suffix) = inputArgs.splitAt(inputArgs.length - 1)
benchmarker.addCase("Enable_Per_SQL_Arg_Qualification") { _ =>
mainInternal(new QualificationArgs(prefix :+ "--per-sql" :+ "--num-threads"
:+ "1" :+ suffix.head),
From 3001cb37b567317ffd9fc741bd932ef72a6aa5d5 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Wed, 10 Jul 2024 11:36:33 -0500
Subject: [PATCH 13/26] Short flag + desc - update
Signed-off-by: Sayed Bilal Bari
---
.../apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
index 03e115660..cdc291240 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
@@ -39,10 +39,10 @@ Benchmarker class for running various benchmarks.
default = Some("text"), descr = "Output format for the benchmark results. For text" +
" the result output will be tabular. In case of json , the results" +
"will be JSON formatted. Currently supported formats are text, json")
- val inputArgs: ScallopOption[String] = opt[String](short = 'a',
+ val inputArgs: ScallopOption[String] = opt[String](short = 'i',
required = false,
- descr = "Input arguments to pass to the benchmark suite. The usage is relevant in cases of " +
- "common arguments across benchmark. The format is space separated arguments. For example " +
+ descr = "Input arguments to pass to the benchmark suite. Used as common arguments across " +
+ "benchmarks. The format is space separated arguments. For example " +
"--output-directory /tmp --per-sql /tmp/eventlogs")
verify()
}
From 118a5056daf068a729959937de8d864063a83b36 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Wed, 10 Jul 2024 11:45:36 -0500
Subject: [PATCH 14/26] Updating short flag usage
Signed-off-by: Sayed Bilal Bari
---
.../apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
index cdc291240..c5c0847f4 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkArgs.scala
@@ -39,7 +39,9 @@ Benchmarker class for running various benchmarks.
default = Some("text"), descr = "Output format for the benchmark results. For text" +
" the result output will be tabular. In case of json , the results" +
"will be JSON formatted. Currently supported formats are text, json")
- val inputArgs: ScallopOption[String] = opt[String](short = 'i',
+ // Conflict with `iterations` for using short flag - `i`.
+ // Going with - `a` for now
+ val inputArgs: ScallopOption[String] = opt[String](short = 'a',
required = false,
descr = "Input arguments to pass to the benchmark suite. Used as common arguments across " +
"benchmarks. The format is space separated arguments. For example " +
From eaa6a2c5c6bb9e4fc9e48ecdcfce116445ac0b57 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Wed, 10 Jul 2024 15:36:17 -0500
Subject: [PATCH 15/26] Refactor for correcting structure
Signed-off-by: Sayed Bilal Bari
---
.../rapids/tool/benchmarks/Benchmark.scala | 58 ++--------
.../tool/benchmarks/BenchmarkBase.scala | 103 +++++++++++++++---
.../SingleThreadedQualToolBenchmark.scala | 18 +--
3 files changed, 101 insertions(+), 78 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
index e59218e8c..7c2151bb6 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -16,15 +16,11 @@
package org.apache.spark.rapids.tool.benchmarks
-import java.io.{OutputStream, PrintStream}
-
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.NANOSECONDS
-import org.apache.commons.io.output.TeeOutputStream
-
-import org.apache.spark.sql.rapids.tool.util.{MemoryMetricsTracker, RuntimeUtil, ToolsTimer}
+import org.apache.spark.sql.rapids.tool.util.{MemoryMetricsTracker, ToolsTimer}
/**
* This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase
@@ -38,19 +34,12 @@ import org.apache.spark.sql.rapids.tool.util.{MemoryMetricsTracker, RuntimeUtil,
*/
class Benchmark(
name: String = "Benchmarker",
- valuesPerIteration: Long,
minNumIters: Int,
warmUpIterations: Int,
- outputPerIteration: Boolean = false,
- output: Option[OutputStream] = None) {
+ outputPerIteration: Boolean = false) {
import Benchmark._
val benchmarks: mutable.ArrayBuffer[Case] = mutable.ArrayBuffer.empty[Benchmark.Case]
- val out: PrintStream = if (output.isDefined) {
- new PrintStream(new TeeOutputStream(System.out, output.get))
- } else {
- System.out
- }
/**
* Adds a case to run when run() is called. The given function will be run for several
@@ -84,7 +73,7 @@ class Benchmark(
* a comment with the benchmark. Although the results vary from machine to machine, it should
* provide some baseline.
*/
- def run(): Unit = {
+ def run(): mutable.ArrayBuffer[Result] = {
require(benchmarks.nonEmpty)
println("-" * 80)
println("Running benchmark: " + name)
@@ -92,46 +81,17 @@ class Benchmark(
val results = benchmarks.map { c =>
println(" RUNNING CASE : " + c.name)
println("-" * 80)
- measure(valuesPerIteration, c.numIters)(c.fn)
+ measure(c.name, c.numIters)(c.fn)
}
println
-
- val firstBest = results.head.bestMs
- val jvmInfo = RuntimeUtil.getJVMOSInfo
- out.printf(s"%-26s : %s \n","JVM Name", jvmInfo("jvm.name"))
- out.printf(s"%-26s : %s \n","Java Version", jvmInfo("jvm.version"))
- out.printf(s"%-26s : %s \n","OS Name", jvmInfo("os.name"))
- out.printf(s"%-26s : %s \n","OS Version", jvmInfo("os.version"))
- out.printf(s"%-26s : %s MB \n","MaxHeapMemory",
- (Runtime.getRuntime.maxMemory()/1024/1024).toString)
- out.printf(s"%-26s : %s \n","Total Warm Up Iterations", warmUpIterations.toString)
- out.printf(s"%-26s : %s \n \n","Total Runtime Iterations", minNumIters.toString)
- val nameLen = Math.max(40, Math.max(name.length, benchmarks.map(_.name.length).max))
- out.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
- name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)","Avg GC Time(ms)",
- "Avg GC Count", "Stdev GC Count","Max GC Time(ms)","Max GC Count", "Relative")
- out.println("-" * (nameLen + 160))
- results.zip(benchmarks).foreach { case (result, benchmark) =>
- out.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
- benchmark.name,
- "%5.0f" format result.bestMs,
- "%4.0f" format result.avgMs,
- "%5.0f" format result.stdevMs,
- "%5.1f" format result.memoryParams.avgGCTime,
- "%5.1f" format result.memoryParams.avgGCCount,
- "%5.0f" format result.memoryParams.stdDevGCCount,
- "%5d" format result.memoryParams.maxGcTime,
- "%5d" format result.memoryParams.maxGCCount,
- "%3.2fX" format (firstBest / result.bestMs))
- }
- out.println()
+ results
}
/**
* Runs a single function `f` for iters, returning the average time the function took and
* the rate of the function.
*/
- def measure(num: Long, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = {
+ def measure(name: String, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = {
System.gc() // ensures garbage from previous cases don't impact this one
for (wi <- 0 until warmUpIterations) {
f(new ToolsTimer(-1))
@@ -177,7 +137,9 @@ class Benchmark(
val avgGcCount = gcCounts.sum / minIters
val avgGcTime = gcTimes.sum / minIters
val maxGcTime = gcTimes.max
- Benchmark.Result(avgRuntime / 1000000.0, bestRuntime / 1000000.0, stdevRunTime / 1000000.0,
+ Benchmark.Result(name, avgRuntime / 1000000.0,
+ bestRuntime / 1000000.0,
+ stdevRunTime / 1000000.0,
JVMMemoryParams(avgGcTime, avgGcCount, stdevGcCount, maxGcCount, maxGcTime))
}
}
@@ -187,6 +149,6 @@ object Benchmark {
case class Case(name: String, fn: ToolsTimer => Unit, numIters: Int)
case class JVMMemoryParams( avgGCTime:Double, avgGCCount:Double,
stdDevGCCount: Double, maxGCCount: Long, maxGcTime:Long)
- case class Result(avgMs: Double, bestMs: Double, stdevMs: Double,
+ case class Result(caseName: String, avgMs: Double, bestMs: Double, stdevMs: Double,
memoryParams: JVMMemoryParams)
}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
index 36bf9d42c..191839f96 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
@@ -16,7 +16,13 @@
package org.apache.spark.rapids.tool.benchmarks
-import java.io.{File, FileOutputStream, OutputStream}
+import java.io.{File, FileOutputStream, PrintStream}
+
+import scala.collection.mutable
+
+import org.apache.commons.io.output.TeeOutputStream
+
+import org.apache.spark.sql.rapids.tool.util.RuntimeUtil
/**
* This code is mostly copied from org.apache.spark.benchmark.BenchmarkBase
@@ -26,16 +32,14 @@ import java.io.{File, FileOutputStream, OutputStream}
* the generated results.
*/
abstract class BenchmarkBase {
- var output: Option[OutputStream] = None
+ private var output: Option[PrintStream] = None
+ private var benchmark: Option[Benchmark] = None
/**
* Main process of the whole benchmark.
* Implementations of this method are supposed to use the wrapper method `runBenchmark`
* for each benchmark scenario.
*/
- def runBenchmarkSuite(iterations: Int,
- warmUpIterations: Int,
- outputFormat: String,
- inputArgs: Array[String]): Unit
+ def runBenchmarkSuite(inputArgs: Array[String]): Unit
final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
val separator = "=" * 96
@@ -47,6 +51,69 @@ abstract class BenchmarkBase {
def prefix: String = "rapids-tools-benchmark"
def suffix: String = ""
+ /**
+ * Add a benchmark case to the suite
+ * @param name Name of the benchmark case
+ * @param numIters Number of iterations to run
+ * @param f Function to run
+ */
+ def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
+ benchmark.get.addCase(name, numIters)(f)
+ }
+
+ /**
+ * Method to trigger the benchmarker cases run method
+ */
+ def run(): Unit = {
+ // Getting correct output stream
+ val printStream = output.getOrElse(System.out)
+ // Running the underlying benchmark
+ val results: mutable.ArrayBuffer[Benchmark.Result] = benchmark.get.run()
+ // Generating the output report
+ val firstBest = results.head.bestMs
+ val nameLen = Math.max(40, results.map(_.caseName.length).max)
+ printStream.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
+ "Benchmark :", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)","Avg GC Time(ms)",
+ "Avg GC Count", "Stdev GC Count","Max GC Time(ms)","Max GC Count", "Relative")
+ printStream.println("-" * (nameLen + 160))
+ results.foreach { result =>
+ printStream.printf(s"%-${nameLen}s %14s %14s %11s %20s %18s %18s %18s %18s %10s\n",
+ result.caseName,
+ "%5.0f" format result.bestMs,
+ "%4.0f" format result.avgMs,
+ "%5.0f" format result.stdevMs,
+ "%5.1f" format result.memoryParams.avgGCTime,
+ "%5.1f" format result.memoryParams.avgGCCount,
+ "%5.0f" format result.memoryParams.stdDevGCCount,
+ "%5d" format result.memoryParams.maxGcTime,
+ "%5d" format result.memoryParams.maxGCCount,
+ "%3.2fX" format (firstBest / result.bestMs))
+ }
+ printStream.println()
+ }
+
+ /**
+ * Method to print the system run specific information
+ * @param warmUpIterations Total warm up iterations
+ * @param iterations Total runtime iterations
+ * @param inputArgs Input arguments
+ */
+ private def printSystemInformation(warmUpIterations: Int, iterations: Int,
+ inputArgs: String ): Unit = {
+ val jvmInfo = RuntimeUtil.getJVMOSInfo
+ output.get.printf(s"%-26s : %s \n","JVM Name", jvmInfo("jvm.name"))
+ output.get.printf(s"%-26s : %s \n","Java Version", jvmInfo("jvm.version"))
+ output.get.printf(s"%-26s : %s \n","OS Name", jvmInfo("os.name"))
+ output.get.printf(s"%-26s : %s \n","OS Version", jvmInfo("os.version"))
+ output.get.printf(s"%-26s : %s MB \n","MaxHeapMemory",
+ (Runtime.getRuntime.maxMemory()/1024/1024).toString)
+ output.get.printf(s"%-26s : %s \n","Total Warm Up Iterations",
+ warmUpIterations.toString)
+ output.get.printf(s"%-26s : %s \n","Total Runtime Iterations",
+ iterations.toString)
+ output.get.printf(s"%-26s : %s \n \n","Input Arguments", inputArgs)
+ }
+
/**
* Any shutdown code to ensure a clean shutdown
*/
@@ -65,16 +132,20 @@ abstract class BenchmarkBase {
if (!file.exists()) {
file.createNewFile()
}
- output = Some(new FileOutputStream(file))
- runBenchmarkSuite(benchArgs.iterations(),
- benchArgs.warmupIterations(),
- benchArgs.outputFormat(),
- benchArgs.inputArgs().split("\\s+").filter(_.nonEmpty))
- output.foreach { o =>
- if (o != null) {
- o.close()
- }
- }
+ // Creating a new output stream
+ // Using TeeOutputStream to multiplex output to both file and stdout
+ val outputStream = new FileOutputStream(file)
+ output = Some(new PrintStream(new TeeOutputStream(System.out, outputStream)))
+ benchmark = Some(new Benchmark(minNumIters = benchArgs.iterations(),
+ warmUpIterations = benchArgs.warmupIterations(),
+ outputPerIteration = true))
+ // Printing the system information
+ printSystemInformation(benchArgs.warmupIterations(),
+ benchArgs.iterations(), benchArgs.inputArgs())
+ // Passing the input arguments to the suite function
+ runBenchmarkSuite(benchArgs.inputArgs().split("\\s+").filter(_.nonEmpty))
+ // Closing the output stream
+ outputStream.close()
afterAll()
}
}
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
index e28e36ec4..f4ad493ad 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
@@ -28,32 +28,22 @@ import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal
* 3. Write benchmarked code inside
*/
object SingleThreadedQualToolBenchmark extends BenchmarkBase {
- override def runBenchmarkSuite(iterations: Int,
- warmUpIterations: Int,
- outputFormat: String,
- inputArgs: Array[String]): Unit = {
+ override def runBenchmarkSuite(inputArgs: Array[String]): Unit = {
// Currently the input arguments are assumed to be common across cases
// This will be improved in a follow up PR to enable passing as a config
// file with argument support for different cases
runBenchmark("Benchmark_Per_SQL_Arg_Qualification") {
- val benchmarker =
- new Benchmark(
- valuesPerIteration = 2,
- output = output,
- outputPerIteration = true,
- warmUpIterations = warmUpIterations,
- minNumIters = iterations)
val (prefix, suffix) = inputArgs.splitAt(inputArgs.length - 1)
- benchmarker.addCase("Enable_Per_SQL_Arg_Qualification") { _ =>
+ addCase("Enable_Per_SQL_Arg_Qualification") { _ =>
mainInternal(new QualificationArgs(prefix :+ "--per-sql" :+ "--num-threads"
:+ "1" :+ suffix.head),
enablePB = true)
}
- benchmarker.addCase("Disable_Per_SQL_Arg_Qualification") { _ =>
+ addCase("Disable_Per_SQL_Arg_Qualification") { _ =>
mainInternal(new QualificationArgs(prefix :+ "--num-threads" :+ "1" :+ suffix.head),
enablePB = true)
}
- benchmarker.run()
+ run()
}
}
}
From b6e7b3f76494ad51c58ba2be3a6ece298ed829ab Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Thu, 11 Jul 2024 09:54:58 -0500
Subject: [PATCH 16/26] Review comments changes
Signed-off-by: Sayed Bilal Bari
---
.../apache/spark/rapids/tool/benchmarks/Benchmark.scala | 7 ++++---
.../spark/rapids/tool/benchmarks/BenchmarkBase.scala | 4 +---
.../tool/benchmarks/SingleThreadedQualToolBenchmark.scala | 2 +-
3 files changed, 6 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
index 7c2151bb6..12170f226 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -40,6 +40,7 @@ class Benchmark(
import Benchmark._
val benchmarks: mutable.ArrayBuffer[Case] = mutable.ArrayBuffer.empty[Benchmark.Case]
+ private val separator = "-" * 80
/**
* Adds a case to run when run() is called. The given function will be run for several
@@ -73,11 +74,11 @@ class Benchmark(
* a comment with the benchmark. Although the results vary from machine to machine, it should
* provide some baseline.
*/
- def run(): mutable.ArrayBuffer[Result] = {
+ def run(): Seq[Result] = {
require(benchmarks.nonEmpty)
- println("-" * 80)
+ println(separator)
println("Running benchmark: " + name)
- println("-" * 80)
+ println(separator)
val results = benchmarks.map { c =>
println(" RUNNING CASE : " + c.name)
println("-" * 80)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
index 191839f96..7e37e26f5 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/BenchmarkBase.scala
@@ -18,8 +18,6 @@ package org.apache.spark.rapids.tool.benchmarks
import java.io.{File, FileOutputStream, PrintStream}
-import scala.collection.mutable
-
import org.apache.commons.io.output.TeeOutputStream
import org.apache.spark.sql.rapids.tool.util.RuntimeUtil
@@ -68,7 +66,7 @@ abstract class BenchmarkBase {
// Getting correct output stream
val printStream = output.getOrElse(System.out)
// Running the underlying benchmark
- val results: mutable.ArrayBuffer[Benchmark.Result] = benchmark.get.run()
+ val results: Seq[Benchmark.Result] = benchmark.get.run()
// Generating the output report
val firstBest = results.head.bestMs
val nameLen = Math.max(40, results.map(_.caseName.length).max)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
index f4ad493ad..239894ad1 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/SingleThreadedQualToolBenchmark.scala
@@ -25,7 +25,7 @@ import com.nvidia.spark.rapids.tool.qualification.QualificationMain.mainInternal
* Usage -
* 1. Override the runBenchmarkSuite method
* 2. Write the benchmark code in the runBenchmark method passing relevant arguments
- * 3. Write benchmarked code inside
+ * 3. Define cases and invoke the function to be benchmarked with input arguments
*/
object SingleThreadedQualToolBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(inputArgs: Array[String]): Unit = {
From 73d399220d1f3051e71ee6b3439fcd9823f72af4 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Thu, 11 Jul 2024 13:28:50 -0500
Subject: [PATCH 17/26] Review comment changes
Signed-off-by: Sayed Bilal Bari
---
.../org/apache/spark/rapids/tool/benchmarks/Benchmark.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
index 12170f226..5e676e379 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -40,7 +40,6 @@ class Benchmark(
import Benchmark._
val benchmarks: mutable.ArrayBuffer[Case] = mutable.ArrayBuffer.empty[Benchmark.Case]
- private val separator = "-" * 80
/**
* Adds a case to run when run() is called. The given function will be run for several
@@ -76,12 +75,13 @@ class Benchmark(
*/
def run(): Seq[Result] = {
require(benchmarks.nonEmpty)
+ val separator = "-" * 80
println(separator)
println("Running benchmark: " + name)
println(separator)
val results = benchmarks.map { c =>
println(" RUNNING CASE : " + c.name)
- println("-" * 80)
+ println(separator)
measure(c.name, c.numIters)(c.fn)
}
println
From 3d9290f84f950c13a26595719678e0ce0f4db913 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Thu, 11 Jul 2024 13:35:17 -0500
Subject: [PATCH 18/26] Adding separator as a val
Signed-off-by: Sayed Bilal Bari
---
.../apache/spark/rapids/tool/benchmarks/Benchmark.scala | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
index 5e676e379..a09f121ad 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/Benchmark.scala
@@ -94,6 +94,7 @@ class Benchmark(
*/
def measure(name: String, overrideNumIters: Int)(f: ToolsTimer => Unit): Result = {
System.gc() // ensures garbage from previous cases don't impact this one
+ val separator = "-" * 80
for (wi <- 0 until warmUpIterations) {
f(new ToolsTimer(-1))
}
@@ -111,14 +112,14 @@ class Benchmark(
gcCounts += memoryTracker.getTotalGCCount
gcTimes += memoryTracker.getTotalGCTime
if (outputPerIteration) {
- println("*"*80)
+ println(separator)
println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} microseconds")
- println("*"*80)
+ println(separator)
}
}
- println("*"*80)
+ println(separator)
println(s" Stopped after $minIters iterations, ${NANOSECONDS.toMillis(runTimes.sum)} ms")
- println("*"*80)
+ println(separator)
assert(runTimes.nonEmpty)
val bestRuntime = runTimes.min
val avgRuntime = runTimes.sum / runTimes.size
From d6bc2da4ef33d99d2e86606ddfb22f6e2c52f853 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Thu, 11 Jul 2024 14:01:53 -0500
Subject: [PATCH 19/26] Adding README for benchmark
Signed-off-by: Sayed Bilal Bari
---
.../spark/rapids/tool/benchmarks/README.md | 32 +++++++++++++++++++
1 file changed, 32 insertions(+)
create mode 100644 core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
new file mode 100644
index 000000000..a18db4a7f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
@@ -0,0 +1,32 @@
+# Benchmarking Tools
+
+This package contains the relevant classes to write and run benchmarks for `RAPIDS TOOLS` for Apache Spark.
+
+## Writing a benchmark -
+* Extend `BenchmarkBase` and override the `runBenchmarkSuite` function
+* Write logically similar benchmarks inside the `runBenchmark` which will add a header to the output with the name provided.
+* Now for each case to be tested, use the Benchmark class `addCase` to write various cases of the same Benchmark.
+* Call the run function to run the benchmark
+* Refer included example benchmark - `SingleThreadedQualToolBenchmark` for implementation details
+
+## Running the benchmark -
+Use the java command to run the created benchmark class with the following supported params -
+* `-i` : total number of iterations to run to calculate average metrics
+* `-w` : total number of warmup iterations to run before calculating the final metrics ( warmup is relevant so that final results are not skewed by the initial java classloading times )
+* `-o` : output directory where to store the final result file. Defaults to the directory rapids-tools-benchmark in the root directory
+* `-f` : output format of the stored result. Currently supports text. Json to be added in future iterations
+* `-a` : input arguments to pass the underlying benchmark classes
+
+#### Running the Benchmark class directly
+```shell
+java -cp $CLASSPATH \
+com.nvidia.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
+-i 3 -w 3 -a " --output-directory output eventlogs"
+```
+#### Running the Benchmark class using tools jar
+```shell
+java -cp /home/rapids-4-spark-tools_2.12-24.06.2-SNAPSHOT.jar:$SPARK_HOME/jars/* \
+org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
+-i 3 -w 3 -a "/home/eventlogs"
+```
+Include the `SPARK` jars in the classpath to benchmark `QUALIFICATION/PROFILING` tool
\ No newline at end of file
From 25a4d8009b12cbb0cea50efa885c4eb8ffbf7f5e Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Fri, 12 Jul 2024 11:51:03 -0500
Subject: [PATCH 20/26] Review changes for README
Signed-off-by: Sayed Bilal Bari
---
.../apache/spark/rapids/tool/benchmarks/README.md | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
index a18db4a7f..73fd1e104 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
@@ -7,7 +7,7 @@ This package contains the relevant classes to write and run benchmarks for `RAPI
* Write logically similar benchmarks inside the `runBenchmark` which will add a header to the output with the name provided.
* Now for each case to be tested, use the Benchmark class `addCase` to write various cases of the same Benchmark.
* Call the run function to run the benchmark
-* Refer included example benchmark - `SingleThreadedQualToolBenchmark` for implementation details
+* Refer included example benchmark - [`SingleThreadedQualToolBenchmark`](./SingleThreadedQualToolBenchmark.scala) for implementation details
## Running the benchmark -
Use the java command to run the created benchmark class with the following supported params -
@@ -20,13 +20,18 @@ Use the java command to run the created benchmark class with the following suppo
#### Running the Benchmark class directly
```shell
java -cp $CLASSPATH \
-com.nvidia.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
+org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
-i 3 -w 3 -a " --output-directory output eventlogs"
```
+`CLASSPATH` should include the path relative to the Benchmarking class being passed.
+Refer to the example below for the same.
+
#### Running the Benchmark class using tools jar
```shell
-java -cp /home/rapids-4-spark-tools_2.12-24.06.2-SNAPSHOT.jar:$SPARK_HOME/jars/* \
+java -cp $RAPIDS_TOOLS_JAR:$SPARK_HOME/jars/* \
org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
--i 3 -w 3 -a "/home/eventlogs"
+-i 3 -w 3 -a " $EVENT_LOGS_DIR"
```
-Include the `SPARK` jars in the classpath to benchmark `QUALIFICATION/PROFILING` tool
\ No newline at end of file
+* `$RAPIDS_TOOLS_JAR` : Path to the rapids tools jar
+* `$SPARK_HOME/jars/*` : Include the `SPARK` jars in the classpath in case benchmarking `QUALIFICATION/PROFILING` tool
+* `$EVENT_LOGS_DIR` : Path to the event logs directory
\ No newline at end of file
From 9b59546d4adc1e511c8c1d8df9c86b09247f0716 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Fri, 12 Jul 2024 15:03:01 -0500
Subject: [PATCH 21/26] Corrected wording in README
Signed-off-by: Sayed Bilal Bari
---
.../scala/org/apache/spark/rapids/tool/benchmarks/README.md | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
index 73fd1e104..2b28a3cc8 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
@@ -23,8 +23,9 @@ java -cp $CLASSPATH \
org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
-i 3 -w 3 -a " --output-directory output eventlogs"
```
-`CLASSPATH` should include the path relative to the Benchmarking class being passed.
-Refer to the example below for the same.
+`CLASSPATH` should be the path relative to which the Benchmarking class being passed.
+Below examples, classpath contains the tool jar relative to which the Benchmarking class is passed
+with the package name ( org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark ).
#### Running the Benchmark class using tools jar
```shell
From 2000e280302b0760b3b35b7d0cb4a93977e52e9b Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Fri, 12 Jul 2024 15:47:02 -0500
Subject: [PATCH 22/26] Updated README review changes
Signed-off-by: Sayed Bilal Bari
---
.../apache/spark/rapids/tool/benchmarks/README.md | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
index 2b28a3cc8..8f112e937 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
@@ -19,13 +19,12 @@ Use the java command to run the created benchmark class with the following suppo
#### Running the Benchmark class directly
```shell
-java -cp $CLASSPATH \
+java -cp $CLASSPATH:$SPARK_HOME/jars/* \
org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
--i 3 -w 3 -a " --output-directory output eventlogs"
+-i 3 -w 3 -a " $EVENT_LOGS_DIR"
```
-`CLASSPATH` should be the path relative to which the Benchmarking class being passed.
-Below examples, classpath contains the tool jar relative to which the Benchmarking class is passed
-with the package name ( org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark ).
+* `$CLASSPATH` : Path to the compiled class directory. Ex - `/target/*`
+* `$EVENT_LOGS_DIR` : Path to the event logs directory
#### Running the Benchmark class using tools jar
```shell
@@ -34,5 +33,7 @@ org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
-i 3 -w 3 -a " $EVENT_LOGS_DIR"
```
* `$RAPIDS_TOOLS_JAR` : Path to the rapids tools jar
+* `$EVENT_LOGS_DIR` : Path to the event logs directory
+
+#### NOTES
* `$SPARK_HOME/jars/*` : Include the `SPARK` jars in the classpath in case benchmarking `QUALIFICATION/PROFILING` tool
-* `$EVENT_LOGS_DIR` : Path to the event logs directory
\ No newline at end of file
From f4d32937e911dde14c61ac4fd41f0fdabb38b5ee Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Fri, 12 Jul 2024 15:49:12 -0500
Subject: [PATCH 23/26] Correcting README typo
Signed-off-by: Sayed Bilal Bari
---
.../scala/org/apache/spark/rapids/tool/benchmarks/README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
index 8f112e937..d2234822f 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
@@ -23,7 +23,7 @@ java -cp $CLASSPATH:$SPARK_HOME/jars/* \
org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
-i 3 -w 3 -a " $EVENT_LOGS_DIR"
```
-* `$CLASSPATH` : Path to the compiled class directory. Ex - `/target/*`
+* `$CLASSPATH` : Path to the compiled class directory. Ex - `/core/target/*`
* `$EVENT_LOGS_DIR` : Path to the event logs directory
#### Running the Benchmark class using tools jar
From a7db0ddacb30ada1d8bc58f94a1135cfc4988f45 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Fri, 12 Jul 2024 15:49:59 -0500
Subject: [PATCH 24/26] Correcting README typo
Signed-off-by: Sayed Bilal Bari
---
.../scala/org/apache/spark/rapids/tool/benchmarks/README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
index d2234822f..756aef81c 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
@@ -23,7 +23,7 @@ java -cp $CLASSPATH:$SPARK_HOME/jars/* \
org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
-i 3 -w 3 -a " $EVENT_LOGS_DIR"
```
-* `$CLASSPATH` : Path to the compiled class directory. Ex - `/core/target/*`
+* `$CLASSPATH` : Path to the compiled class directory. Ex - `/core/target/*`
* `$EVENT_LOGS_DIR` : Path to the event logs directory
#### Running the Benchmark class using tools jar
From c424cc9100c77a56fc479e4518b8fc63739c5ed6 Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Fri, 12 Jul 2024 16:11:15 -0500
Subject: [PATCH 25/26] README typo - RAPIDS_TOOLs -> SPARK_RAPIDS..
Signed-off-by: Sayed Bilal Bari
---
.../scala/org/apache/spark/rapids/tool/benchmarks/README.md | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
index 756aef81c..1b6657740 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
@@ -1,6 +1,6 @@
# Benchmarking Tools
-This package contains the relevant classes to write and run benchmarks for `RAPIDS TOOLS` for Apache Spark.
+This package contains the relevant classes to write and run benchmarks for `SPARK RAPIDS TOOLS` for Apache Spark.
## Writing a benchmark -
* Extend `BenchmarkBase` and override the `runBenchmarkSuite` function
@@ -28,11 +28,11 @@ org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
#### Running the Benchmark class using tools jar
```shell
-java -cp $RAPIDS_TOOLS_JAR:$SPARK_HOME/jars/* \
+java -cp $SPARK_RAPIDS_TOOLS_JAR:$SPARK_HOME/jars/* \
org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
-i 3 -w 3 -a " $EVENT_LOGS_DIR"
```
-* `$RAPIDS_TOOLS_JAR` : Path to the rapids tools jar
+* `$SPARK_RAPIDS_TOOLS_JAR` : Path to the spark rapids tools jar
* `$EVENT_LOGS_DIR` : Path to the event logs directory
#### NOTES
From 17c2a05ed9ac30cdf40ab323274c545823468e0f Mon Sep 17 00:00:00 2001
From: Sayed Bilal Bari
Date: Mon, 15 Jul 2024 10:03:31 -0500
Subject: [PATCH 26/26] Adding license header + README changes
Signed-off-by: Sayed Bilal Bari
---
.../spark/rapids/tool/benchmarks/README.md | 3 ++-
.../rapids/tool/util/MemoryMetricsTracker.scala | 16 ++++++++++++++++
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
index 1b6657740..8046b6321 100644
--- a/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
+++ b/core/src/main/scala/org/apache/spark/rapids/tool/benchmarks/README.md
@@ -19,11 +19,12 @@ Use the java command to run the created benchmark class with the following suppo
#### Running the Benchmark class directly
```shell
-java -cp $CLASSPATH:$SPARK_HOME/jars/* \
+java -cp $CLASSPATH:$SPARK_HOME/jars/*:$MAVEN-ARTIFACT-JAR \
org.apache.spark.rapids.tool.benchmarks.SingleThreadedQualToolBenchmark \
-i 3 -w 3 -a " $EVENT_LOGS_DIR"
```
* `$CLASSPATH` : Path to the compiled class directory. Ex - `/core/target/*`
+* `$MAVEN-ARTIFACT-JAR` : Path to maven-artifact jar. Download jar from [here](https://mvnrepository.com/artifact/org.apache.maven/maven-artifact/3.9.0)
* `$EVENT_LOGS_DIR` : Path to the event logs directory
#### Running the Benchmark class using tools jar
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/MemoryMetricsTracker.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/MemoryMetricsTracker.scala
index c796e413c..de2a9a98d 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/MemoryMetricsTracker.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/MemoryMetricsTracker.scala
@@ -1,3 +1,19 @@
+/*
+ * Copyright (c) 2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.sql.rapids.tool.util
import java.lang.management.ManagementFactory