Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize implementation of getAggregateRawMetrics in core-tools #1468

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.nvidia.spark.rapids.tool.profiling._

import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef, AccumNameRef}
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, AccumMetaRef}

/**
* Does analysis on the DataFrames from object of AppBase.
Expand Down Expand Up @@ -85,10 +85,10 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
} else {
val jobAggAccumulator = new AggAccumHelper()
val perJobRec = jobAggAccumulator.accumPerJob(
jc.stageIds.filter(stageLevelSparkMetrics(index).contains)
.map { stageId =>
jc.stageIds.collect {
case stageId if stageLevelSparkMetrics(index).contains(stageId) =>
stageLevelSparkMetrics(index)(stageId)
})
})
if (perJobRec.isEmptyAggregates) {
None
} else {
Expand Down Expand Up @@ -178,10 +178,10 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
// TODO: Should we only consider successful tasks?
val sqlAggAccumulator = new AggAccumHelper()
val preSqlRec = sqlAggAccumulator.accumPerSQL(
stagesInSQL.filter(stageLevelSparkMetrics(index).contains)
.map { stageId =>
stagesInSQL.collect {
case stageId if stageLevelSparkMetrics(index).contains(stageId) =>
stageLevelSparkMetrics(index)(stageId)
})
})
if (preSqlRec.isEmptyAggregates) {
None
} else {
Expand Down Expand Up @@ -322,20 +322,21 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
app.asInstanceOf[ApplicationInfo].planMetricProcessor
}
val zeroAccumProfileResults =
AccumProfileResults(0, 0, AccumMetaRef(0L, AccumNameRef("")), 0L, 0L, 0L, 0L)

AccumProfileResults(0, 0, AccumMetaRef.EMPTY_ACCUM_META_REF, 0L, 0L, 0L, 0L)
val emptyNodeNames = Seq.empty[String]
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults]
Comment on lines +325 to +327
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cindyyuanjiang

  • It is better to avoid creating metrics/nodeNames with empty Strings. Because it is harder to notice them and then it could lead to other problems in the CSV files or on joining based on metric names when the string is empty. That's why I replaced empty string with "N/A"
  • Moved the creation of default values outside the map block.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @amahussein!

// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
// count duplicate task attempts
val numTasks = tasksInStage.size
val nodeNames = sqlAnalyzer.stageToNodeNames.
getOrElse(sm.stageInfo.stageId, Seq.empty[String])
val diagnosticMetricsMap = sqlAnalyzer.stageToDiagnosticMetrics.
getOrElse(sm.stageInfo.stageId, HashMap.empty[String, AccumProfileResults]).
withDefaultValue(zeroAccumProfileResults)
val nodeNames = sqlAnalyzer.stageToNodeNames.getOrElse(sm.stageInfo.stageId, emptyNodeNames)
val diagnosticMetricsMap =
sqlAnalyzer.stageToDiagnosticMetrics
.getOrElse(sm.stageInfo.stageId, emptyDiagnosticMetrics)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reformated the code because it was not easy to read that withDefaultValue is applied on getOrElse

.withDefaultValue(zeroAccumProfileResults)
val srTotalBytesMetrics =
AppSparkMetricsAnalyzer.getStatistics(tasksInStage.map(_.sr_totalBytesRead))

Expand Down Expand Up @@ -450,8 +451,6 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
perStageRec.srTotalBytesReadSum,
perStageRec.swBytesWrittenSum,
perStageRec.swRecordsWrittenSum,
// Leave this timeUnit in NanoSeconds so that it will be more accurate when we take
// aggregates on higher levels (i.e., SQL/Job)
perStageRec.swWriteTimeSum)
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow)
}
Expand All @@ -477,12 +476,4 @@ object AppSparkMetricsAnalyzer {
StatisticsMetrics(sortedArr.head, med, sortedArr(len - 1), sortedArr.sum)
}
}

def maxWithEmptyHandling(arr: Iterable[Long]): Long = {
if (arr.isEmpty) {
0L
} else {
arr.max
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ import org.apache.spark.sql.rapids.tool.store.TaskModel
* a parellel processor can be used to split the iterables without changing the caller side.
*/
class AggAccumHelper {
private def initializeRecord(rec: TaskMetricsAccumRec, iterable: Iterable[Any]): Unit = {
if (iterable.isEmpty) { // Reset aggregate fields for empty collections
rec.resetFields()
}
}

private def accumCachedRecords[R <: TaskMetricsAccumRec](
stageRecords: Iterable[StageAggTaskMetricsProfileResult],
Expand All @@ -45,22 +40,19 @@ class AggAccumHelper {

def accumPerStage(taskRecords: Iterable[TaskModel]): TaskMetricsAccumRec = {
val resRec = createStageAccumRecord()
initializeRecord(resRec, taskRecords)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also wondering the need to initializeRecord() before.

taskRecords.foreach(resRec.addRecord)
resRec.finalizeAggregation()
resRec
}

def accumPerSQL(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): SQLAggAccum = {
val resRec = SQLAggAccum()
initializeRecord(resRec, stageRecords)
accumCachedRecords(stageRecords, resRec)
resRec
}

def accumPerJob(stageRecords: Iterable[StageAggTaskMetricsProfileResult]): JobAggAccum = {
val resRec = JobAggAccum()
initializeRecord(resRec, stageRecords)
accumCachedRecords(stageRecords, resRec)
resRec
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ case class StageAggPhoton(
// the peakMemValues.
swWriteTimeSum = 0
peakExecutionMemoryMax = 0
if (shuffleWriteValues.nonEmpty) {
swWriteTimeSum = TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)
}
if (peakMemValues.nonEmpty) {
peakExecutionMemoryMax = peakMemValues.max
if (!isEmptyAggregates) {
// Re-calculate the photon specific fields only if the accumulator has tasks.
// Otherwise, leave it as 0.
if (shuffleWriteValues.nonEmpty) {
swWriteTimeSum = TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)
}
if (peakMemValues.nonEmpty) {
peakExecutionMemoryMax = peakMemValues.max
}
}
super.finalizeAggregation()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,17 @@ class TaskMetricsAccumRec {
*/
def isEmptyAggregates: Boolean = numTasks == 0

/**
* Reset all fields to 0. This is used to reset the fields when the Task iterator is empty.
* When the iterator is empty, then fields such as "max" should be reset to 0.
*/
def resetFields(): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we add a comment on why do we need to reset fields here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Also refactored the code to do that within the class which is better OOP

durationMax = 0
durationMin = 0
peakExecutionMemoryMax = 0
resultSizeMax = 0
}
nartal1 marked this conversation as resolved.
Show resolved Hide resolved

def addRecord(rec: TaskModel): Unit = {
numTasks += 1
// SumFields
Expand Down Expand Up @@ -102,6 +107,7 @@ class TaskMetricsAccumRec {
// Min Fields
durationMin = math.min(durationMin, rec.duration)
}

def addRecord(rec: StageAggTaskMetricsProfileResult): Unit = {
// Sums
numTasks += rec.numTasks
Expand Down Expand Up @@ -143,5 +149,9 @@ class TaskMetricsAccumRec {
*/
def finalizeAggregation(): Unit = {
durationAvg = ToolUtils.calculateAverage(durationSum, numTasks, 1)
if (numTasks < 1) {
// number of tasks is 0, then we should reset fields such as (max, min) to 0.
resetFields()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ case class AccumMetaRef(id: Long, name: AccumNameRef) {
}

object AccumMetaRef {
val EMPTY_ACCUM_META_REF: AccumMetaRef = new AccumMetaRef(0L, AccumNameRef.EMPTY_ACC_NAME_REF)
def apply(id: Long, name: Option[String]): AccumMetaRef =
new AccumMetaRef(id, AccumNameRef.getOrCreateAccumNameRef(name))
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class AccumNameRef(value: String) {
object AccumNameRef {
// Dummy AccNameRef to represent None accumulator names. This is an optimization to avoid
// storing an option[string] for all accumulable names which leads to "get-or-else" everywhere.
private val EMPTY_ACC_NAME_REF: AccumNameRef = new AccumNameRef("N/A")
val EMPTY_ACC_NAME_REF: AccumNameRef = new AccumNameRef("N/A")
// A global table to store reference to all accumulator names. The map is accessible by all
// threads (different applications) running in parallel. This avoids duplicate work across
// different threads.
Expand Down
Loading