Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool support for read and write execs and more, add mapping stage times to sql execs #5440

Merged
merged 86 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
041a9c6
calculate the other time based on stage time
tgravescs May 2, 2022
7b62a2d
change away from per stage calculation
tgravescs May 2, 2022
eeb67cd
fixes
tgravescs May 2, 2022
a825174
fix attempt id
tgravescs May 2, 2022
e46d009
more logging
tgravescs May 2, 2022
349e96c
change way duration execs handled
tgravescs May 2, 2022
f8dd75d
Add in FileSourcescan
tgravescs May 2, 2022
dff6d16
Add BatchScan
tgravescs May 2, 2022
3ea327e
add InsertIntoHadoopFsRelationCommand
tgravescs May 2, 2022
e1c73db
more reads
tgravescs May 3, 2022
5eff01c
move logic parsing
tgravescs May 3, 2022
46a374a
Add separate ReadParser
tgravescs May 3, 2022
cff2cd7
Merge branch 'branch-22.06' of github.com:NVIDIA/spark-rapids into qu…
tgravescs May 3, 2022
e8d3486
Merge branch 'qualStageDur' of github.com:tgravescs/spark-rapids into…
tgravescs May 3, 2022
23cf10d
score read
tgravescs May 3, 2022
e3a0f14
update BatchScan
tgravescs May 3, 2022
3acb7f4
start write formatting
tgravescs May 3, 2022
1981acf
debug
tgravescs May 3, 2022
6fbb66c
finish updating writes
tgravescs May 3, 2022
d0c2f73
Adding tests scan
tgravescs May 3, 2022
ead253a
update test
tgravescs May 3, 2022
0d7ecc2
fix 0 speedup
tgravescs May 3, 2022
93720ec
update more tests
tgravescs May 3, 2022
ecd7e4b
reenable the reads checks
tgravescs May 3, 2022
9ee035f
fix failure checking
tgravescs May 3, 2022
3e1ccea
add test for batchscan
tgravescs May 3, 2022
faab1e6
update test
tgravescs May 3, 2022
5657e73
have batrch scan print format
tgravescs May 3, 2022
98c31c4
fix speedup < 1
tgravescs May 3, 2022
5bc8f9d
start test for InsertIntoHadoopFsRelationCommand
tgravescs May 3, 2022
51071b1
update write test
tgravescs May 3, 2022
23acb67
use lower case
tgravescs May 3, 2022
9846a56
add test for CreateDataSourceTableAsSelectCommand
tgravescs May 3, 2022
67523a0
update CreateTable
tgravescs May 3, 2022
4625f68
in memory table scan test and test cleanup
tgravescs May 4, 2022
7c3b88f
fixes
tgravescs May 4, 2022
ba9f66d
update Create test
tgravescs May 4, 2022
6a3e75e
use try finally
tgravescs May 4, 2022
61cdab5
change to use eventlog
tgravescs May 4, 2022
04eb645
add event log for create table
tgravescs May 4, 2022
9366a23
fix test
tgravescs May 4, 2022
eb60508
fix test
tgravescs May 4, 2022
f8d7aa1
Add Exchange support
tgravescs May 4, 2022
8c94bdc
Adding in broadcast and subquery broadcast
tgravescs May 4, 2022
dfedeab
debug
tgravescs May 4, 2022
71d0c53
Change to get driver accums
tgravescs May 4, 2022
b98252e
fix subquery
tgravescs May 4, 2022
ba3e673
fix metric name subquerybroadcast
tgravescs May 4, 2022
652ce20
Add event logs for aqe and custom shuffle
tgravescs May 4, 2022
7047400
Add custom shuffle reader
tgravescs May 4, 2022
7f89564
update test
tgravescs May 4, 2022
12c6ec6
fix test
tgravescs May 4, 2022
6acca66
cleanup case statement
tgravescs May 4, 2022
824154d
fix cast
tgravescs May 4, 2022
ad81426
Merge remote-tracking branch 'origin/branch-22.06' into qualStageDur
tgravescs May 4, 2022
866d51e
Refactor stages in sql node
tgravescs May 4, 2022
de75329
change stages from option
tgravescs May 4, 2022
858cde6
fix flatten
tgravescs May 4, 2022
599f9e4
testing
tgravescs May 4, 2022
62e38cf
Merge remote-tracking branch 'origin/branch-22.06' into qualStageDur
tgravescs May 5, 2022
3610168
fix merge
tgravescs May 5, 2022
09bcf03
cleanup and filter execs should be removed
tgravescs May 5, 2022
681026c
update exchange duration test
tgravescs May 5, 2022
0bfa5a3
move exchange test
tgravescs May 5, 2022
f3f142e
debug
tgravescs May 5, 2022
79c3369
convert nanos to ms
tgravescs May 5, 2022
2f90b55
fix timings
tgravescs May 5, 2022
2fe7e4e
comment
tgravescs May 5, 2022
9fbc600
Merge remote-tracking branch 'origin/branch-22.06' into qualStageDur
tgravescs May 5, 2022
110cd12
fix return types
tgravescs May 5, 2022
f798dce
fix seq returned
tgravescs May 5, 2022
58f0828
fix tests
tgravescs May 5, 2022
097179c
fix tests
tgravescs May 5, 2022
da17434
add debug text
tgravescs May 5, 2022
733a98e
add helper
tgravescs May 5, 2022
7e092af
filter children
tgravescs May 5, 2022
e07198d
change wholestagecodegen speedup to remove not supported
tgravescs May 5, 2022
0f4ce31
debug
tgravescs May 5, 2022
5ce8074
fix bug
tgravescs May 5, 2022
6526b37
cleanup
tgravescs May 5, 2022
5b52d9a
turn logs to debug
tgravescs May 5, 2022
bc68c08
update logging
tgravescs May 5, 2022
e52ad7b
cleanup
tgravescs May 9, 2022
7c1c657
Review comments
tgravescs May 9, 2022
507c17d
fix exec name
tgravescs May 9, 2022
282e497
review comments
tgravescs May 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.AppBase

case class BatchScanExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long,
app: AppBase) extends ExecParser with Logging {

val fullExecName = "BatchScanExec"

override def parse: ExecInfo = {
val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId)
val maxDuration = SQLPlanParser.getTotalDuration(accumId, app)
val readInfo = ReadParser.parseReadNode(node)
// don't use the isExecSupported because we have finer grain.
val score = ReadParser.calculateReadScoreRatio(readInfo, checker)
val speedupFactor = checker.getSpeedupFactor(fullExecName)
val overallSpeedup = Math.max((speedupFactor * score).toInt, 1)

// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, s"${node.name} ${readInfo.format}", "", overallSpeedup,
maxDuration, node.id, score > 0, None)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.AppBase

case class BroadcastExchangeExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long,
app: AppBase) extends ExecParser with Logging {

val fullExecName = node.name + "Exec"

override def parse: ExecInfo = {
// TODO - check the relation to see if really supported
val collectTimeId = node.metrics.find(_.name == "time to collect").map(_.accumulatorId)
val buildTimeId = node.metrics.find(_.name == "time to build").map(_.accumulatorId)
val broadcastTimeId = node.metrics.find(_.name == "time to broadcast").map(_.accumulatorId)
val maxCollectTime = SQLPlanParser.getDriverTotalDuration(collectTimeId, app)
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
val maxBuildTime = SQLPlanParser.getDriverTotalDuration(buildTimeId, app)
val maxBroadcastTime = SQLPlanParser.getDriverTotalDuration(broadcastTimeId, app)
val duration = (maxCollectTime ++ maxBuildTime ++ maxBroadcastTime).reduceOption(_ + _)
val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", filterSpeedupFactor,
duration, node.id, isSupported, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case class CartesianProductExecParser(

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
override def parse: ExecInfo = {
// CartesianProduct doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
Expand All @@ -36,7 +36,6 @@ case class CartesianProductExecParser(
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case class CoalesceExecParser(

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
override def parse: ExecInfo = {
// coalesce doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
Expand All @@ -36,7 +36,7 @@ case class CoalesceExecParser(
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case class CollectLimitExecParser(

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
override def parse: ExecInfo = {
// collect doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
Expand All @@ -36,7 +36,7 @@ case class CollectLimitExecParser(
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class CustomShuffleReaderExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

// note this is called either AQEShuffleRead and CustomShuffleReader depending
// on the Spark version, our supported ops list it as CustomShuffleReader
val fullExecName = "CustomShuffleReaderExec"

override def parse: ExecInfo = {
// doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class DataWritingCommandExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

// hardcode because InsertIntoHadoopFsRelationCommand uses this same exec
// and InsertIntoHadoopFsRelationCommand doesn't have an entry in the
// supported execs file
val fullExecName = "DataWritingCommandExec"
nartal1 marked this conversation as resolved.
Show resolved Hide resolved

override def parse: ExecInfo = {
val writeFormat = node.desc.split(",")(2)
val writeSupported = checker.isWriteFormatsupported(writeFormat)
val duration = None
val speedupFactor = checker.getSpeedupFactor(fullExecName)
val finalSpeedup = if (writeSupported) speedupFactor else 1
// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, s"${node.name.trim} ${writeFormat.toLowerCase.trim}", "", finalSpeedup,
duration, node.id, writeSupported, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
package com.nvidia.spark.rapids.tool.planparser

trait ExecParser {
def parse: Seq[ExecInfo]
def parse: ExecInfo
val fullExecName: String
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case class ExpandExecParser(

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
override def parse: ExecInfo = {
// Expand doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
Expand All @@ -36,7 +36,6 @@ case class ExpandExecParser(
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.AppBase

case class FileSourceScanExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long,
app: AppBase) extends ExecParser with Logging {

// The node name for Scans is Scan <format> so here we hardcode
val fullExecName = "FileSourceScanExec"

override def parse: ExecInfo = {
val accumId = node.metrics.find(_.name == "scan time").map(_.accumulatorId)
val maxDuration = SQLPlanParser.getTotalDuration(accumId, app)

val readInfo = ReadParser.parseReadNode(node)
// don't use the isExecSupported because we have finer grain.
val score = ReadParser.calculateReadScoreRatio(readInfo, checker)
val speedupFactor = checker.getSpeedupFactor(fullExecName)
val overallSpeedup = Math.max((speedupFactor * score).toInt, 1)

// TODO - add in parsing expressions - average speedup across?
ExecInfo(sqlID, node.name, "", overallSpeedup, maxDuration, node.id, score > 0, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case class FilterExecParser(

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
override def parse: ExecInfo = {
// filter doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
Expand All @@ -36,7 +36,6 @@ case class FilterExecParser(
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case class GenerateExecParser(

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
override def parse: ExecInfo = {
// Generate doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
Expand All @@ -36,7 +36,6 @@ case class GenerateExecParser(
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case class GlobalLimitExecParser(

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
override def parse: ExecInfo = {
// GlobalLimit doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
Expand All @@ -36,7 +36,6 @@ case class GlobalLimitExecParser(
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
ExecInfo(sqlID, node.name, "", speedupFactor, duration, node.id, isSupported, None)
}
}
Loading