From 6dd119889b2066619b3a65c4424557273cae4230 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 24 May 2021 11:10:05 -0600 Subject: [PATCH 1/3] Fix regression in cost-based optimizer when calculting cost for Window operations Signed-off-by: Andy Grove --- .../spark/rapids/CostBasedOptimizer.scala | 17 +++++++++- .../rapids/CostBasedOptimizerSuite.scala | 32 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala index d1bb3596bab..05d4ec7c112 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ListBuffer import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, GetStructField} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, GetStructField, WindowFrame, WindowSpecDefinition} import org.apache.spark.sql.catalyst.plans.{JoinType, LeftAnti, LeftSemi} import org.apache.spark.sql.execution.{GlobalLimitExec, LocalLimitExec, SparkPlan, TakeOrderedAndProjectExec, UnionExec} import org.apache.spark.sql.execution.adaptive.{CustomShuffleReaderExec, QueryStageExec} @@ -262,6 +262,10 @@ class CpuCostModel(conf: RapidsConf) extends CostModel { } private def exprCost[INPUT <: Expression](expr: BaseExprMeta[INPUT], rowCount: Double): Double = { + if (MemoryCostHelper.isWindowExpr(expr)) { + // Window expressions are Unevaluable and accessing dataType causes an exception + return 0 + } val memoryReadCost = expr.wrapped match { case _: Alias => @@ -308,6 +312,10 @@ class GpuCostModel(conf: RapidsConf) extends CostModel { } private def exprCost[INPUT <: Expression](expr: BaseExprMeta[INPUT], rowCount: Double): Double = { + if (MemoryCostHelper.isWindowExpr(expr)) { + // Window expressions are Unevaluable and accessing dataType causes an exception + return 0 + } var memoryReadCost = 0d var memoryWriteCost = 0d @@ -351,6 +359,13 @@ object MemoryCostHelper { def calculateCost(dataSize: Long, memorySpeed: Double): Double = { (dataSize / GIGABYTE) / memorySpeed } + + def isWindowExpr[INPUT <: Expression](expr: BaseExprMeta[INPUT]) = { + expr.wrapped match { + case _: WindowSpecDefinition | _: WindowFrame => true + case _ => false + } + } } /** diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala index bd9d274e58b..16e1edf32a5 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala @@ -514,6 +514,38 @@ class CostBasedOptimizerSuite extends SparkQueryCompareTestSuite assert(broadcastExchanges.forall(_._2.toLong > 0)) } + // The purpose of this test is simply to make sure that we do not try and calculate the cost + // of expressions that cannot be evaluated. This can cause exceptions when estimating data + // sizes. For example, WindowFrame.dataType throws UnsupportedOperationException. + test("Window expression (unevaluable)") { + logError("Window expression (unevaluable)") + + val conf = new SparkConf() + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") + .set(RapidsConf.OPTIMIZER_ENABLED.key, "true") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, + "ProjectExec,WindowExec,ShuffleExchangeExec,WindowSpecDefinition," + + "SpecifiedWindowFrame,WindowExpression,Alias,Rank,HashPartitioning," + + "UnboundedPreceding$,CurrentRow$,SortExec") + + withGpuSparkSession(spark => { + employeeDf(spark).createOrReplaceTempView("employees") + val df: DataFrame = spark.sql("SELECT name, dept, RANK() " + + "OVER (PARTITION BY dept ORDER BY salary) AS rank FROM employees") + df.collect() + }, conf) + } + + private def employeeDf(session: SparkSession): DataFrame = { + import session.sqlContext.implicits._ + Seq( + ("A", "IT", 1234), + ("B", "IT", 4321), + ("C", "Sales", 4321), + ("D", "Sales", 1234) + ).toDF("name", "dept", "salary") + } + private def collectPlansWithRowCount( plan: SparkPlanMeta[_], accum: ListBuffer[SparkPlanMeta[_]]): Unit = { From be8b669ad0bb6ec8b6d8a0f12e323535197cbae1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 24 May 2021 13:51:01 -0600 Subject: [PATCH 2/3] Rename method Signed-off-by: Andy Grove --- .../com/nvidia/spark/rapids/CostBasedOptimizer.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala index 05d4ec7c112..b0ec4a4e31e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala @@ -262,8 +262,7 @@ class CpuCostModel(conf: RapidsConf) extends CostModel { } private def exprCost[INPUT <: Expression](expr: BaseExprMeta[INPUT], rowCount: Double): Double = { - if (MemoryCostHelper.isWindowExpr(expr)) { - // Window expressions are Unevaluable and accessing dataType causes an exception + if (MemoryCostHelper.isExcludedFromCost(expr)) { return 0 } @@ -312,8 +311,7 @@ class GpuCostModel(conf: RapidsConf) extends CostModel { } private def exprCost[INPUT <: Expression](expr: BaseExprMeta[INPUT], rowCount: Double): Double = { - if (MemoryCostHelper.isWindowExpr(expr)) { - // Window expressions are Unevaluable and accessing dataType causes an exception + if (MemoryCostHelper.isExcludedFromCost(expr)) { return 0 } @@ -360,9 +358,11 @@ object MemoryCostHelper { (dataSize / GIGABYTE) / memorySpeed } - def isWindowExpr[INPUT <: Expression](expr: BaseExprMeta[INPUT]) = { + def isExcludedFromCost[INPUT <: Expression](expr: BaseExprMeta[INPUT]) = { expr.wrapped match { - case _: WindowSpecDefinition | _: WindowFrame => true + case _: WindowSpecDefinition | _: WindowFrame => + // Window expressions are Unevaluable and accessing dataType causes an exception + true case _ => false } } From 253b615e714dcbd556051c0a6a5738c4ecc3cb9a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 24 May 2021 14:02:11 -0600 Subject: [PATCH 3/3] Use 0d instead of 0 Signed-off-by: Andy Grove --- .../scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala index b0ec4a4e31e..fe008dfdb3b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala @@ -263,7 +263,7 @@ class CpuCostModel(conf: RapidsConf) extends CostModel { private def exprCost[INPUT <: Expression](expr: BaseExprMeta[INPUT], rowCount: Double): Double = { if (MemoryCostHelper.isExcludedFromCost(expr)) { - return 0 + return 0d } val memoryReadCost = expr.wrapped match { @@ -312,7 +312,7 @@ class GpuCostModel(conf: RapidsConf) extends CostModel { private def exprCost[INPUT <: Expression](expr: BaseExprMeta[INPUT], rowCount: Double): Double = { if (MemoryCostHelper.isExcludedFromCost(expr)) { - return 0 + return 0d } var memoryReadCost = 0d