From c854b781f2d33b9af7f845407a51cead8d45c413 Mon Sep 17 00:00:00 2001 From: Zhen Li <10524738+zhli1142015@users.noreply.github.com> Date: Thu, 7 Mar 2024 08:24:03 +0800 Subject: [PATCH] [VL] minor change for delta ut (#4869) --- .../execution/VeloxDeltaSuite.scala | 154 ++++++++++-------- 1 file changed, 83 insertions(+), 71 deletions(-) diff --git a/gluten-delta/src/test/scala/io/glutenproject/execution/VeloxDeltaSuite.scala b/gluten-delta/src/test/scala/io/glutenproject/execution/VeloxDeltaSuite.scala index a0d813e6b4ae9..c41f1f889edde 100644 --- a/gluten-delta/src/test/scala/io/glutenproject/execution/VeloxDeltaSuite.scala +++ b/gluten-delta/src/test/scala/io/glutenproject/execution/VeloxDeltaSuite.scala @@ -44,93 +44,105 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite { // IdMapping is supported in Delta 2.2 (related to Spark3.3.1) testWithSpecifiedSparkVersion("column mapping mode = id", Some("3.3.1")) { - spark.sql(s""" - |create table delta_cm1 (id int, name string) using delta - |tblproperties ("delta.columnMapping.mode"= "id") - |""".stripMargin) - spark.sql(s""" - |insert into delta_cm1 values (1, "v1"), (2, "v2") - |""".stripMargin) - val df1 = runQueryAndCompare("select * from delta_cm1") { _ => } - checkLengthAndPlan(df1, 2) - checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) - - val df2 = runQueryAndCompare("select name from delta_cm1 where id = 2") { _ => } - checkLengthAndPlan(df2, 1) - checkAnswer(df2, Row("v2") :: Nil) + withTable("delta_cm1") { + spark.sql(s""" + |create table delta_cm1 (id int, name string) using delta + |tblproperties ("delta.columnMapping.mode"= "id") + |""".stripMargin) + spark.sql(s""" + |insert into delta_cm1 values (1, "v1"), (2, "v2") + |""".stripMargin) + val df1 = runQueryAndCompare("select * from delta_cm1") { _ => } + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) + + val df2 = runQueryAndCompare("select name from delta_cm1 where id = 2") { _ => } + checkLengthAndPlan(df2, 1) + checkAnswer(df2, Row("v2") :: Nil) + } } // NameMapping is supported in Delta 2.0 (related to Spark3.2.0) testWithSpecifiedSparkVersion("column mapping mode = name", Some("3.2.0")) { - spark.sql(s""" - |create table delta_cm2 (id int, name string) using delta - |tblproperties ("delta.columnMapping.mode"= "name") - |""".stripMargin) - spark.sql(s""" - |insert into delta_cm2 values (1, "v1"), (2, "v2") - |""".stripMargin) - val df1 = runQueryAndCompare("select * from delta_cm2") { _ => } - checkLengthAndPlan(df1, 2) - checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) - - val df2 = runQueryAndCompare("select name from delta_cm2 where id = 2") { _ => } - checkLengthAndPlan(df2, 1) - checkAnswer(df2, Row("v2") :: Nil) + withTable("delta_cm2") { + spark.sql(s""" + |create table delta_cm2 (id int, name string) using delta + |tblproperties ("delta.columnMapping.mode"= "name") + |""".stripMargin) + spark.sql(s""" + |insert into delta_cm2 values (1, "v1"), (2, "v2") + |""".stripMargin) + val df1 = runQueryAndCompare("select * from delta_cm2") { _ => } + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) + + val df2 = runQueryAndCompare("select name from delta_cm2 where id = 2") { _ => } + checkLengthAndPlan(df2, 1) + checkAnswer(df2, Row("v2") :: Nil) + } } test("delta: time travel") { - spark.sql(s""" - |create table delta_tm (id int, name string) using delta - |""".stripMargin) - spark.sql(s""" - |insert into delta_tm values (1, "v1"), (2, "v2") - |""".stripMargin) - spark.sql(s""" - |insert into delta_tm values (3, "v3"), (4, "v4") - |""".stripMargin) - val df1 = runQueryAndCompare("select * from delta_tm VERSION AS OF 1") { _ => } - checkLengthAndPlan(df1, 2) - checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) - val df2 = runQueryAndCompare("select * from delta_tm VERSION AS OF 2") { _ => } - checkLengthAndPlan(df2, 4) - checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil) - val df3 = runQueryAndCompare("select name from delta_tm VERSION AS OF 2 where id = 2") { _ => } - checkLengthAndPlan(df3, 1) - checkAnswer(df3, Row("v2") :: Nil) + withTable("delta_tm") { + spark.sql(s""" + |create table delta_tm (id int, name string) using delta + |""".stripMargin) + spark.sql(s""" + |insert into delta_tm values (1, "v1"), (2, "v2") + |""".stripMargin) + spark.sql(s""" + |insert into delta_tm values (3, "v3"), (4, "v4") + |""".stripMargin) + val df1 = runQueryAndCompare("select * from delta_tm VERSION AS OF 1") { _ => } + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) + val df2 = runQueryAndCompare("select * from delta_tm VERSION AS OF 2") { _ => } + checkLengthAndPlan(df2, 4) + checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil) + val df3 = runQueryAndCompare("select name from delta_tm VERSION AS OF 2 where id = 2") { + _ => + } + checkLengthAndPlan(df3, 1) + checkAnswer(df3, Row("v2") :: Nil) + } } test("delta: partition filters") { - spark.sql(s""" - |create table delta_pf (id int, name string) using delta partitioned by (name) - |""".stripMargin) - spark.sql(s""" - |insert into delta_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") - |""".stripMargin) - val df1 = runQueryAndCompare("select * from delta_pf where name = 'v1'") { _ => } - val deltaScanTransformer = df1.queryExecution.executedPlan.collect { - case f: FileSourceScanExecTransformer => f - }.head - // No data filters as only partition filters exist - assert(deltaScanTransformer.filterExprs().size == 0) - checkLengthAndPlan(df1, 2) - checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil) - } - - test("basic test with stats.skipping disabled") { - withSQLConf("spark.databricks.delta.stats.skipping" -> "false") { + withTable("delta_pf") { spark.sql(s""" - |create table delta_test2 (id int, name string) using delta + |create table delta_pf (id int, name string) using delta partitioned by (name) |""".stripMargin) spark.sql(s""" - |insert into delta_test2 values (1, "v1"), (2, "v2") + |insert into delta_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") |""".stripMargin) - val df1 = runQueryAndCompare("select * from delta_test2") { _ => } + val df1 = runQueryAndCompare("select * from delta_pf where name = 'v1'") { _ => } + val deltaScanTransformer = df1.queryExecution.executedPlan.collect { + case f: FileSourceScanExecTransformer => f + }.head + // No data filters as only partition filters exist + assert(deltaScanTransformer.filterExprs().size == 0) checkLengthAndPlan(df1, 2) - checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) + checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil) + } + } - val df2 = runQueryAndCompare("select name from delta_test2 where id = 2") { _ => } - checkLengthAndPlan(df2, 1) - checkAnswer(df2, Row("v2") :: Nil) + test("basic test with stats.skipping disabled") { + withTable("delta_test2") { + withSQLConf("spark.databricks.delta.stats.skipping" -> "false") { + spark.sql(s""" + |create table delta_test2 (id int, name string) using delta + |""".stripMargin) + spark.sql(s""" + |insert into delta_test2 values (1, "v1"), (2, "v2") + |""".stripMargin) + val df1 = runQueryAndCompare("select * from delta_test2") { _ => } + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) + + val df2 = runQueryAndCompare("select name from delta_test2 where id = 2") { _ => } + checkLengthAndPlan(df2, 1) + checkAnswer(df2, Row("v2") :: Nil) + } } }