Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize implementation of getAggregateRawMetrics in core-tools #1468

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

import org.apache.spark.sql.rapids.tool.store.TaskModel

/**
* A helper class to facilitate the accumulation of aggregate metrics.
* This is a separate class to allow further customization in the future. For example,
* a parellel processor can be used to split the iterables without changing the caller side.
*/
class AggAccumHelper {
private def initializeRecord(rec: TaskMetricsAccumRec, iterable: Iterable[Any]): Unit = {
if (iterable.isEmpty) { // Reset aggregate fields for empty collections
rec.resetFields()
}
}

private def accumCachedRecords[R <: TaskMetricsAccumRec](
stageRecords: Iterable[StageAggTaskMetricsProfileResult],
rec: R): Unit = {
stageRecords.foreach(rec.addRecord)
rec.finalizeAggregation()
}

protected def createStageAccumRecord(): TaskMetricsAccumRec = {
StageAggAccum()
}

def accumPerStage(taskRecords: Iterable[TaskModel]): TaskMetricsAccumRec = {
val resRec = createStageAccumRecord()
initializeRecord(resRec, taskRecords)
taskRecords.foreach(resRec.addRecord)
resRec.finalizeAggregation()
resRec
}

def accumPerSQL(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): SQLAggAccum = {
val resRec = SQLAggAccum()
initializeRecord(resRec, stageRecords)
accumCachedRecords(stageRecords, resRec)
resRec
}

def accumPerJob(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): JobAggAccum = {
val resRec = JobAggAccum()
initializeRecord(resRec, stageRecords)
accumCachedRecords(stageRecords, resRec)
resRec
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

/**
* Implementation of AggAccumHelper for Photon.
* It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those
* values are not available in the TaskModel.
*/
class AggAccumPhotonHelper(
shuffleWriteValues: Iterable[Long],
peakMemValues: Iterable[Long]) extends AggAccumHelper {

override def createStageAccumRecord(): TaskMetricsAccumRec = {
StageAggPhoton(shuffleWriteValues, peakMemValues)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import org.apache.spark.sql.rapids.tool.store.TaskModel

/**
* Accumulator for Job Aggregates.
* This is an optimization to avoid using the Scala collections API on each field for the entire
* number of tasks/stages in a Job.
*/
case class JobAggAccum() extends TaskMetricsAccumRec {
override def addRecord(rec: TaskModel): Unit = {
throw new UnsupportedOperationException(
"Not implemented: JobAggAccum accepts only cached records")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.store.TaskModel

/**
* Accumulator for SQL Aggregates.
* This is an optimization to avoid using the Scala collections API on each field for the entire
* number of tasks/stages in a SQL.
*/
case class SQLAggAccum(
var executorCpuRatio: Double = 0,
// Not added to the output since it is used only by the AutoTuner
var inputBytesReadAvg: Double = 0) extends TaskMetricsAccumRec {

override def finalizeAggregation(): Unit = {
super.finalizeAggregation()
executorCpuRatio = ToolUtils.calculateDurationPercent(executorCPUTimeSum, executorRunTimeSum)
inputBytesReadAvg = ToolUtils.calculateAverage(inputBytesReadSum, numTasks, 1)
}

override def addRecord(rec: TaskModel): Unit = {
throw new UnsupportedOperationException(
"Not implemented: SQLAggAccum accepts only cached records")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

/**
* Accumulator for Stage Aggregates.
* This is an optimization to avoid using the Scala collections API on each field for the entire
* number of tasks in a Stage.
*/
case class StageAggAccum() extends TaskMetricsAccumRec {
override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = {
throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" +
"calculate stage aggregates")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis.util

import java.util.concurrent.TimeUnit

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

/**
* Implementation of Accumulator object for Photon.
* It takes the shuffleWriteValues and peakMemValues Accumulables as an argument because those
* values are not available in the TaskModel.
*/
case class StageAggPhoton(
shuffleWriteValues: Iterable[Long],
peakMemValues: Iterable[Long]) extends TaskMetricsAccumRec {

override def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = {
throw new UnsupportedOperationException("Not implemented: Cannot use cached results to" +
"calculate stage aggregates")
}

override def finalizeAggregation(): Unit = {
// Fix the shuffleWriteTimes and the peakMemoryValues to use the shuffleWriteValues and
// the peakMemValues.
swWriteTimeSum = 0
peakExecutionMemoryMax = 0
if (shuffleWriteValues.nonEmpty) {
swWriteTimeSum = TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)
}
if (peakMemValues.nonEmpty) {
peakExecutionMemoryMax = peakMemValues.max
}
super.finalizeAggregation()
}
}
Loading
Loading