Skip to content

Commit

Permalink
Refactor TaskEnd to be accessible by Q/P tools (#1000)
Browse files Browse the repository at this point in the history
* Refactor TaskEnd to be accessible by Q/P tools

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

Contributes to #980

* Add Stage Accumulables to the accumulable objects for Q tool

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

* Refactor the code to allow Qual tool to generate same CSV files as Prof

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

* Cleaning up some naming conventions

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

* Fix typo in job/stage qual agg metrics

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

* Remove redundant sort function in skewshuffle analyzer

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

* Fix typos and remove unused classes from ProfileClassWarehouse

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

---------

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
  • Loading branch information
amahussein authored May 16, 2024
1 parent a82c2a6 commit 0cc500f
Show file tree
Hide file tree
Showing 36 changed files with 2,062 additions and 1,018 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis

import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobStageAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult}

/**
* The result of the aggregation of the raw metrics. It contains the aggregated metrics for an
* application. This case class is used to allow to separate the aggregation of the metrics from
* how the view are generated.
* For example, the profiler tool currently merges both job/stage-level in a single list.
* As a step toward separating the logic from the views, the analyzer returns
* AggRawMetricsResult that contains the aggregated metrics for jobs, stages, SQLs, and IOs.
* In later refactors, we can revisit *TaskMetricsProfileResult to have integer IDs instead of
* the current format "stage_ID" or "job_ID". We still use the old format to keep the compatibility
* with other modules.
*
* @param jobAggs the aggregated Spark metrics for jobs
* @param stageAggs the aggregated Spark metrics for stages
* @param taskShuffleSkew list of tasks that exhibit shuffle skewness
* @param sqlAggs the aggregated Spark metrics for SQLs
* @param ioAggs lists the SQLs along their IO metrics
* @param sqlDurAggs the aggregated duration and CPU time for SQLs
* @param maxTaskInputSizes a sequence of SQLMaxTaskInputSizes that contains the maximum input size
*/
case class AggRawMetricsResult(
jobAggs: Seq[JobStageAggTaskMetricsProfileResult],
stageAggs: Seq[JobStageAggTaskMetricsProfileResult],
taskShuffleSkew: Seq[ShuffleSkewProfileResult],
sqlAggs: Seq[SQLTaskAggMetricsProfileResult],
ioAggs: Seq[IOAnalysisProfileResult],
sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult],
maxTaskInputSizes: Seq[SQLMaxTaskInputSizes])
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis

import org.apache.spark.sql.rapids.tool.AppBase

/**
* Base class for application analysis
* @param app the AppBase object to analyze
*/
abstract class AppAnalysisBase(app: AppBase) {
// Keep for future refactoring to use common methods for all Analysis classes.
// Ideally, we can common interface
// 1- caching layer
// 2- initializations
// 3- interface to pull information to generate views and reports
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.analysis

import org.apache.spark.sql.rapids.tool.AppBase
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

trait AppIndexMapperTrait {
def zipAppsWithIndex(apps: Seq[AppBase]): Seq[(AppBase, Int)]
}

// Implementation used by Qualification components because AppBase has no appIndex field. Instead,
// this implementation generates index based on the order of the apps.
trait QualAppIndexMapperTrait extends AppIndexMapperTrait {
def zipAppsWithIndex(apps: Seq[AppBase]): Seq[(AppBase, Int)] = {
// we did not use zipWithIndex because we want to start from 1 instead of 0
apps.zip(Stream.from(1))
}
}

// Implementation used by Profiling components because ApplicationInfo has appIndex field which is
// used in generating reports with multiple AppIds
trait ProfAppIndexMapperTrait extends AppIndexMapperTrait {
override def zipAppsWithIndex(apps: Seq[AppBase]): Seq[(AppBase, Int)] = {
apps.collect {
case app: ApplicationInfo => (app, app.index)
}
}
}
Loading

0 comments on commit 0cc500f

Please sign in to comment.