Skip to content

Commit

Permalink
Optimize implementation of getAggregateRawMetrics in core-tools (#1468)
Browse files Browse the repository at this point in the history
* Optimize implementation of getAggregateRawMetrics in core-tools
* address reviews and fix issues in aggregateDiagnostic

Contributes to #1461

This commit improves the implementation of aggregation accross raw
metrics by replacing the builtin scala collections with accumulators.

---------

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
  • Loading branch information
amahussein authored Dec 18, 2024
1 parent 4143ccc commit 18b0472
Show file tree
Hide file tree
Showing 10 changed files with 529 additions and 172 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

import org.apache.spark.sql.rapids.tool.store.TaskModel

/**
* A helper class to facilitate the accumulation of aggregate metrics.
* This is a separate class to allow further customization in the future. For example,
* a parellel processor can be used to split the iterables without changing the caller side.
*/
class AggAccumHelper {

private def accumCachedRecords[R <: TaskMetricsAccumRec](
stageRecords: Iterable[StageAggTaskMetricsProfileResult],
rec: R): Unit = {
stageRecords.foreach(rec.addRecord)
rec.finalizeAggregation()
}

protected def createStageAccumRecord(): TaskMetricsAccumRec = {
StageAggAccum()
}

def accumPerStage(taskRecords: Iterable[TaskModel]): TaskMetricsAccumRec = {
val resRec = createStageAccumRecord()
taskRecords.foreach(resRec.addRecord)
resRec.finalizeAggregation()
resRec
}

def accumPerSQL(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): SQLAggAccum = {
val resRec = SQLAggAccum()
accumCachedRecords(stageRecords, resRec)
resRec
}

def accumPerJob(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): JobAggAccum = {
val resRec = JobAggAccum()
accumCachedRecords(stageRecords, resRec)
resRec
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

/**
* Implementation of AggAccumHelper for Photon.
* It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those
* values are not available in the TaskModel.
*/
class AggAccumPhotonHelper(
shuffleWriteValues: Iterable[Long],
peakMemValues: Iterable[Long]) extends AggAccumHelper {

override def createStageAccumRecord(): TaskMetricsAccumRec = {
StageAggPhoton(shuffleWriteValues, peakMemValues)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import org.apache.spark.sql.rapids.tool.store.TaskModel

/**
* Accumulator for Job Aggregates.
* This is an optimization to avoid using the Scala collections API on each field for the entire
* number of tasks/stages in a Job.
*/
case class JobAggAccum() extends TaskMetricsAccumRec {
override def addRecord(rec: TaskModel): Unit = {
throw new UnsupportedOperationException(
"Not implemented: JobAggAccum accepts only cached records")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.store.TaskModel

/**
* Accumulator for SQL Aggregates.
* This is an optimization to avoid using the Scala collections API on each field for the entire
* number of tasks/stages in a SQL.
*/
case class SQLAggAccum(
var executorCpuRatio: Double = 0,
// Not added to the output since it is used only by the AutoTuner
var inputBytesReadAvg: Double = 0) extends TaskMetricsAccumRec {

override def finalizeAggregation(): Unit = {
super.finalizeAggregation()
executorCpuRatio = ToolUtils.calculateDurationPercent(executorCPUTimeSum, executorRunTimeSum)
inputBytesReadAvg = ToolUtils.calculateAverage(inputBytesReadSum, numTasks, 1)
}

override def addRecord(rec: TaskModel): Unit = {
throw new UnsupportedOperationException(
"Not implemented: SQLAggAccum accepts only cached records")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

/**
* Accumulator for Stage Aggregates.
* This is an optimization to avoid using the Scala collections API on each field for the entire
* number of tasks in a Stage.
*/
case class StageAggAccum() extends TaskMetricsAccumRec {
override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = {
throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" +
"calculate stage aggregates")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import java.util.concurrent.TimeUnit

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

/**
* Implementation of Accumulator object for Photon.
* It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those
* values are not available in the TaskModel.
*/
case class StageAggPhoton(
shuffleWriteValues: Iterable[Long],
peakMemValues: Iterable[Long]) extends TaskMetricsAccumRec {

override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = {
throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" +
"calculate stage aggregates")
}

override def finalizeAggregation(): Unit = {
// Fix the shuffleWriteTimes and the peakMemoryValues to use the shuffleWriteValues and
// the peakMemValues.
swWriteTimeSum = 0
peakExecutionMemoryMax = 0
if (!isEmptyAggregates) {
// Re-calculate the photon specific fields only if the accumulator has tasks.
// Otherwise, leave it as 0.
if (shuffleWriteValues.nonEmpty) {
swWriteTimeSum = TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)
}
if (peakMemValues.nonEmpty) {
peakExecutionMemoryMax = peakMemValues.max
}
}
super.finalizeAggregation()
}
}
Loading

0 comments on commit 18b0472

Please sign in to comment.