Skip to content

Commit

Permalink
Release 0.17
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaCanali committed Sep 3, 2020
1 parent 06b9fa9 commit fe361a0
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 48 deletions.
79 changes: 48 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,31 @@
![sparkMeasure CI](https://github.com/LucaCanali/sparkMeasure/workflows/sparkMeasure%20CI/badge.svg?branch=master&event=push)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/ch.cern.sparkmeasure/spark-measure_2.11/badge.svg)](https://maven-badges.herokuapp.com/maven-central/ch.cern.sparkmeasure/spark-measure_2.11)

### SparkMeasure is a tool for performance troubleshooting of Apache Spark workloads
SparkMeasure simplifies the collection and analysis of Spark performance metrics.
Use sparkMeasure for troubleshooting **interactive and batch** Spark workloads.
Use it also to collect metrics for long-term retention or as part of a **CI/CD** pipeline.
### SparkMeasure is a tool for performance troubleshooting of Apache Spark jobs
SparkMeasure simplifies the collection and analysis of Spark performance metrics.
Use sparkMeasure for troubleshooting **interactive and batch** Spark workloads.
Use it also to collect metrics for long-term retention or as part of a **CI/CD** pipeline.
SparkMeasure is also intended as a working example of how to use Spark Listeners for collecting Spark task metrics data.
* Main author and contact: Luca.Canali@cern.ch + credits to Viktor.Khristenko@cern.ch + thanks to PR contributors
* Compatibility: Spark 2.1.x and higher.

### Getting started with sparkMeasure, by example
* How to use: deploy [sparkMeasure from Maven Central](https://mvnrepository.com/artifact/ch.cern.sparkmeasure/spark-measure)
- Spark 2.x built with scala 2_11:
- Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.16`
- Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.16`
- note: `pip install sparkmeasure` to get the Python wrapper API.
- Spark 3.0.x and 2.4.x built with scala 2_12:
- Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.16`
- Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.16`
- note: `pip install sparkmeasure` to get the Python wrapper API.
- Bleeding edge: build from master using sbt: `sbt +package` and use the jars instead of packages.
* Main author and contact:
* Luca.Canali@cern.ch + credits to Viktor.Khristenko@cern.ch + thanks to PR contributors
* For Spark 2.x and 3.x
* Tested on Spark 2.4 and 3.0
* Spark 2.3 -> should also be OK
* Spark 2.1 and 2.2 -> use sparkMeasure version 0.16

### Getting started with sparkMeasure
* Note: sparkMeasure is available on [Maven Central](https://mvnrepository.com/artifact/ch.cern.sparkmeasure/spark-measure)
* Spark 3.0.x and 2.4.x with scala 2.12:
- Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17`
- Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17`
- note: `pip install sparkmeasure` to get the Python wrapper API.
* Spark 2.x with Scala 2.11:
- Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.17`
- Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.17`
- note: `pip install sparkmeasure` to get the Python wrapper API.
* Bleeding edge: build sparkMeasure jar using sbt: `sbt +package` and use `--jars`
with the jar just built instead of using `--packages`.
* Note: find the latest jars already built as artifacts in the [GitHub actions](https://github.com/LucaCanali/sparkMeasure/actions)

- [<img src="https://upload.wikimedia.org/wikipedia/commons/6/63/Databricks_Logo.png" height="40"> Scala notebook on Databricks](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2061385495597958/2729765977711377/442806354506758/latest.html)

Expand All @@ -33,10 +39,10 @@ SparkMeasure is also intended as a working example of how to use Spark Listeners

- [<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/3/38/Jupyter_logo.svg/250px-Jupyter_logo.svg.png" height="50"> Local Python/Jupyter Notebook](examples/SparkMeasure_Jupyter_Python_getting_started.ipynb)

- CLI: spark-shell and pyspark
- CLI: spark-shell and PySpark
```
# Scala CLI, Spark 3.0
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.16
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show())
Expand All @@ -57,32 +63,43 @@ Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
numTasks => 17
elapsedTime => 14594 (15 s)
stageDuration => 14498 (14 s)
executorRunTime => 108563 (1.8 min)
executorCpuTime => 106613 (1.8 min)
executorDeserializeTime => 4149 (4 s)
executorDeserializeCpuTime => 1025 (1 s)
resultSerializationTime => 1 (1 ms)
jvmGCTime => 64 (64 ms)
elapsedTime => 13520 (14 s)
stageDuration => 13411 (13 s)
executorRunTime => 100020 (1.7 min)
executorCpuTime => 98899 (1.6 min)
executorDeserializeTime => 4358 (4 s)
executorDeserializeCpuTime => 1887 (2 s)
resultSerializationTime => 2 (2 ms)
jvmGCTime => 56 (56 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 15 (15 ms)
shuffleWriteTime => 11 (11 ms)
resultSize => 19955 (19.0 KB)
numUpdatedBlockStatuses => 0
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 2000
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 472 (472 Bytes)
shuffleRecordsWritten => 8
```
- CLI: spark-shell, measure workload metrics aggregating from raw task metrics
```
# Scala CLI, Spark 3.0
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
val taskMetrics = ch.cern.sparkmeasure.TaskMetrics(spark)
taskMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show())
```

### One tool for different use cases, links to documentation and examples
* **Interactive mode**: use sparkMeasure to collect and analyze Spark workload metrics real-time when
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
name := "spark-measure"

version := "0.17-SNAPSHOT"
version := "0.17"

scalaVersion := "2.12.10"
crossScalaVersions := Seq("2.11.12", "2.12.10")

licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0"))

isSnapshot := true
isSnapshot := false

spName := "spark-measure"
sparkVersion := "2.4.6"
Expand Down
9 changes: 1 addition & 8 deletions docs/TODO_and_issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,16 @@ If you plan to contribute to sparkMeasure development, please start by reviewing
on the user to validate the output.
* TODO: Task metrics values collected by sparkMeasure are only for successfully executed tasks. Note that
resources used by failed tasks are not collected in the current version. Can this be improved?
* We can expected more task metrics being added in future versions. Current code is not version aware and does
not offer and easy way to handle additional metrics only for newer versions without breaking backward compatibility.
TODO: implement Spark version awareness and custom list of metrics in sparkMeasure
+ Following [SPARK PR 18249](https://github.com/apache/spark/pull/18249/files) add support for the metric
remoteBytesReadToDisk Task Metric (this is relevant for Spark 2.3.x and above).
* TODO: Flight recorder mode, task metrics, find ways to write metrics out to output files incrementally,
rather than using the current approach of buffering everything in memory and writing at the end?
The current approach has obvious scalability issues.
* TODO: write more tests to be executed by travis CI
* TODO: write more tests to be executed by GitHub CI actions
* TODO: add code/exceptions to handle error conditions that can arise in sparkMeasure code
* TODO: add more statistics related to job execution, for example report start/min/max.number of executors
the job had, which is useful in the case of yarn with spark dynamic allocation
* TODO (maybe): add additional sinks for the collected metrics and aggregations besides prometheus,
two possible candidates are Kafka and InfluxDB
* ~~TODO (maybe): remove _updatedBlockStatuses from the list of metrics collected by spakMeasure
This follows [SPARK PR 18162](https://github.com/apache/spark/pull/18162)
TaskMetrics._updatedBlockStatuses is off by default.~~
* TODO (maybe) implement in sparkMeasure the removeSparkListener method, to allow stopping data collection
from sparkMeasure. (note this is only possible from Spark versions 2.2 and above)
* gatherAccumulables=true for taskMetrics(sparkSession: SparkSession, gatherAccumulables: Boolean)
Expand Down
1 change: 0 additions & 1 deletion src/main/scala/ch/cern/sparkmeasure/influxdbsink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener {

if (logStageMetrics) {
val taskmetrics = stageCompleted.stageInfo.taskMetrics
// TODO: add all the available metrics
val point2 = Point.measurement("stage_metrics")
.tag("applicationId", appId)
.time(completionTime, TimeUnit.MILLISECONDS)
Expand Down
14 changes: 8 additions & 6 deletions src/test/scala/ch/cern/sparkmeasure/UtilsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,27 @@ class UtilsTest extends FlatSpec with Matchers {
submissionTime = 10, completionTime = 11, stageDuration = 12, numTasks = 13,
executorRunTime = 14, executorCpuTime = 15,
executorDeserializeTime = 16, executorDeserializeCpuTime = 17,
resultSerializationTime = 18, jvmGCTime = 19, resultSize = 20, numUpdatedBlockStatuses = 21,
resultSerializationTime = 18, jvmGCTime = 19, resultSize = 20,
diskBytesSpilled = 30, memoryBytesSpilled = 31, peakExecutionMemory = 32, recordsRead = 33,
bytesRead = 34, recordsWritten = 35, bytesWritten = 36,
shuffleFetchWaitTime = 40, shuffleTotalBytesRead = 41, shuffleTotalBlocksFetched = 42,
shuffleLocalBlocksFetched = 43, shuffleRemoteBlocksFetched = 44, shuffleWriteTime = 45,
shuffleBytesWritten = 46, shuffleRecordsWritten = 47
shuffleLocalBlocksFetched = 43, shuffleRemoteBlocksFetched = 44, shuffleLocalBytesRead = 45,
shuffleRemoteBytesRead = 46, shuffleRemoteBytesReadToDisk = 47, shuffleRecordsRead = 48,
shuffleWriteTime = 50, shuffleBytesWritten = 51, shuffleRecordsWritten = 52
)

val taskVals0 = TaskVals(jobId = 1, jobGroup = "test", stageId = 2, index = 3, launchTime = 4, finishTime = 5,
duration = 10, schedulerDelay = 11, executorId = "exec0", host = "host0", taskLocality = 12,
speculative = false, gettingResultTime = 12, successful = true,
executorRunTime = 14, executorCpuTime = 15,
executorDeserializeTime = 16, executorDeserializeCpuTime = 17,
resultSerializationTime = 18, jvmGCTime = 19, resultSize = 20, numUpdatedBlockStatuses = 21,
resultSerializationTime = 18, jvmGCTime = 19, resultSize = 20,
diskBytesSpilled = 30, memoryBytesSpilled = 31, peakExecutionMemory = 32, recordsRead = 33,
bytesRead = 34, recordsWritten = 35, bytesWritten = 36,
shuffleFetchWaitTime = 40, shuffleTotalBytesRead = 41, shuffleTotalBlocksFetched = 42,
shuffleLocalBlocksFetched = 43, shuffleRemoteBlocksFetched = 44, shuffleWriteTime = 45,
shuffleBytesWritten = 46, shuffleRecordsWritten = 47
shuffleLocalBlocksFetched = 43, shuffleRemoteBlocksFetched = 44, shuffleLocalBytesRead = 45,
shuffleRemoteBytesRead = 46, shuffleRemoteBytesReadToDisk = 47, shuffleRecordsRead = 48,
shuffleWriteTime = 50, shuffleBytesWritten = 51, shuffleRecordsWritten = 52
)

it should "write and read back StageVal (Java Serialization)" in {
Expand Down

0 comments on commit fe361a0

Please sign in to comment.