Skip to content

Commit

Permalink
Optimize implementation of getAggregateRawMetrics in core-tools
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

Contributes to #1461

This commit improves the implementation of aggregation accross raw
metrics by replacing the builtin scala collections with accumulators.
  • Loading branch information
amahussein committed Dec 17, 2024
1 parent 9564d0b commit b637ebf
Show file tree
Hide file tree
Showing 8 changed files with 515 additions and 156 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

import org.apache.spark.sql.rapids.tool.store.TaskModel

/**
* A helper class to facilitate the accumulation of aggregate metrics.
* This is a separate class to allow further customization in the future. For example,
* a parellel processor can be used to split the iterables without changing the caller side.
*/
class AggAccumHelper {
private def initializeRecord(rec: TaskMetricsAccumRec, iterable: Iterable[Any]): Unit = {
if (iterable.isEmpty) { // Reset aggregate fields for empty collections
rec.resetFields()
}
}

private def accumCachedRecords[R <: TaskMetricsAccumRec](
stageRecords: Iterable[StageAggTaskMetricsProfileResult],
rec: R): Unit = {
stageRecords.foreach(rec.addRecord)
rec.finalizeAggregation()
}

protected def createStageAccumRecord(): TaskMetricsAccumRec = {
StageAggAccum()
}

def accumPerStage(taskRecords: Iterable[TaskModel]): TaskMetricsAccumRec = {
val resRec = createStageAccumRecord()
initializeRecord(resRec, taskRecords)
taskRecords.foreach(resRec.addRecord)
resRec.finalizeAggregation()
resRec
}

def accumPerSQL(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): SQLAggAccum = {
val resRec = SQLAggAccum()
initializeRecord(resRec, stageRecords)
accumCachedRecords(stageRecords, resRec)
resRec
}

def accumPerJob(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): JobAggAccum = {
val resRec = JobAggAccum()
initializeRecord(resRec, stageRecords)
accumCachedRecords(stageRecords, resRec)
resRec
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

/**
* Implementation of AggAccumHelper for Photon.
* It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those
* values are not available in the TaskModel.
*/
class AggAccumPhotonHelper(
shuffleWriteValues: Iterable[Long],
peakMemValues: Iterable[Long]) extends AggAccumHelper {

override def createStageAccumRecord(): TaskMetricsAccumRec = {
StageAggPhoton(shuffleWriteValues, peakMemValues)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import org.apache.spark.sql.rapids.tool.store.TaskModel

/**
* Accumulator for Job Aggregates.
* This is an optimization to avoid using the Scala collections API on each field for the entire
* number of tasks/stages in a Job.
*/
case class JobAggAccum() extends TaskMetricsAccumRec {
override def addRecord(rec: TaskModel): Unit = {
throw new UnsupportedOperationException(
"Not implemented: JobAggAccum accepts only cached records")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.store.TaskModel

/**
* Accumulator for SQL Aggregates.
* This is an optimization to avoid using the Scala collections API on each field for the entire
* number of tasks/stages in a SQL.
*/
case class SQLAggAccum(
var executorCpuRatio: Double = 0,
// Not added to the output since it is used only by the AutoTuner
var inputBytesReadAvg: Double = 0) extends TaskMetricsAccumRec {

override def finalizeAggregation(): Unit = {
super.finalizeAggregation()
executorCpuRatio = ToolUtils.calculateDurationPercent(executorCPUTimeSum, executorRunTimeSum)
inputBytesReadAvg = ToolUtils.calculateAverage(inputBytesReadSum, numTasks, 1)
}

override def addRecord(rec: TaskModel): Unit = {
throw new UnsupportedOperationException(
"Not implemented: SQLAggAccum accepts only cached records")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

/**
* Accumulator for Stage Aggregates.
* This is an optimization to avoid using the Scala collections API on each field for the entire
* number of tasks in a Stage.
*/
case class StageAggAccum() extends TaskMetricsAccumRec {
override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = {
throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" +
"calculate stage aggregates")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import java.util.concurrent.TimeUnit

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

/**
* Implementation of Accumulator object for Photon.
* It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those
* values are not available in the TaskModel.
*/
case class StageAggPhoton(
shuffleWriteValues: Iterable[Long],
peakMemValues: Iterable[Long]) extends TaskMetricsAccumRec {

override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = {
throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" +
"calculate stage aggregates")
}

override def finalizeAggregation(): Unit = {
// Fix the shuffleWriteTimes and the peakMemoryValues to use the shuffleWriteValues and
// the peakMemValues.
swWriteTimeSum = 0
peakExecutionMemoryMax = 0
if (shuffleWriteValues.nonEmpty) {
swWriteTimeSum = TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)
}
if (peakMemValues.nonEmpty) {
peakExecutionMemoryMax = peakMemValues.max
}
super.finalizeAggregation()
}
}
Loading

0 comments on commit b637ebf

Please sign in to comment.