Skip to content

Commit

Permalink
Qualification should treat promote_precision as supported (#545)
Browse files Browse the repository at this point in the history
* Qualification should treat promote_precision as supported

Fixes #517

- Added column value `promote_precision` to `SQL Func`
- Added new unit test to verify that promote_precision is supported for
  Spark LT 3.4.0

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

---------

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>
  • Loading branch information
amahussein authored Sep 8, 2023
1 parent 61460fd commit 7982701
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
4 changes: 2 additions & 2 deletions core/src/main/resources/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ Pow,S,`pow`; `power`,None,AST,rhs,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA
Pow,S,`pow`; `power`,None,AST,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
PreciseTimestampConversion,S, ,None,project,input,NA,NA,NA,NA,S,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA
PreciseTimestampConversion,S, ,None,project,result,NA,NA,NA,NA,S,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,NA,NA
PromotePrecision,S, ,None,project,input,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA
PromotePrecision,S, ,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA
PromotePrecision,S,`promote_precision`,None,project,input,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA
PromotePrecision,S,`promote_precision`,None,project,result,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA
PythonUDF,S, ,None,aggregation,param,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,PS,NS,PS,NS
PythonUDF,S, ,None,aggregation,result,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NA,PS,NS,PS,NA
PythonUDF,S, ,None,reduction,param,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,PS,NS,PS,NS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ class BaseTestSuite extends FunSuite with BeforeAndAfterEach with Logging {
"Spark340 does not parse the eventlog correctly")
}

protected def ignoreExprForSparkGTE340(): (Boolean, String) = {
(!ToolUtils.isSpark340OrLater(),
"Spark340+ does not support the expression")
}

def runConditionalTest(testName: String, assumeCondition: () => (Boolean, String))
(fun: => Unit): Unit = {
val (isAllowed, ignoreMessage) = assumeCondition()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.functions.{ceil, col, collect_list, count, explode,
import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo
import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil
import org.apache.spark.sql.types.StringType


class SQLPlanParserSuite extends BaseTestSuite {
Expand Down Expand Up @@ -864,6 +863,7 @@ class SQLPlanParserSuite extends BaseTestSuite {
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir,
"ProjectExprsSupported") { spark =>
import spark.implicits._
import org.apache.spark.sql.types.StringType
val df1 = Seq(9.9, 10.2, 11.6, 12.5).toDF("value")
df1.write.parquet(s"$parquetoutputLoc/testtext")
val df2 = spark.read.parquet(s"$parquetoutputLoc/testtext")
Expand Down Expand Up @@ -1036,4 +1036,41 @@ class SQLPlanParserSuite extends BaseTestSuite {
val expressions = SQLPlanParser.parseAggregateExpressions(exprString)
expressions should ===(expected)
}

runConditionalTest("promote_precision is supported for Spark LT 3.4.0: issue-517",
ignoreExprForSparkGTE340) {
// Spark-3.4.0 removed the promote_precision SQL function
// the SQL generates the following physical plan
// (1) Project [CheckOverflow((promote_precision(cast(dec1#24 as decimal(13,2)))
// + promote_precision(cast(dec2#25 as decimal(13,2)))), DecimalType(13,2))
// AS (dec1 + dec2)#30]
// For Spark3.4.0, the promote_precision was removed from the plan.
// (1) Project [(dec1#24 + dec2#25) AS (dec1 + dec2)#30]
TrampolineUtil.withTempDir { parquetoutputLoc =>
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir,
"projectPromotePrecision") { spark =>
import spark.implicits._
import org.apache.spark.sql.types.DecimalType
val df = Seq(("12347.21", "1234154"), ("92233.08", "1")).toDF
.withColumn("dec1", col("_1").cast(DecimalType(7, 2)))
.withColumn("dec2", col("_2").cast(DecimalType(10, 0)))
// write the df to parquet to transform localTableScan to projectExec
df.write.parquet(s"$parquetoutputLoc/testPromotePrecision")
val df2 = spark.read.parquet(s"$parquetoutputLoc/testPromotePrecision")
df2.selectExpr("dec1+dec2")
}
val pluginTypeChecker = new PluginTypeChecker()
val app = createAppFromEventlog(eventLog)
val parsedPlans = app.sqlPlans.map { case (sqlID, plan) =>
SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app)
}
// The promote_precision should be part of the project exec.
val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq)
val projExecs = allExecInfo.filter(_.exec.contains("Project"))
assertSizeAndSupported(1, projExecs)
}
}
}

}

0 comments on commit 7982701

Please sign in to comment.