Skip to content

Commit

Permalink
[VL] minor change for delta ut (apache#4869)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 authored and taiyang-li committed Mar 25, 2024
1 parent c4a4d1d commit c854b78
Showing 1 changed file with 83 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,93 +44,105 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {

// IdMapping is supported in Delta 2.2 (related to Spark3.3.1)
testWithSpecifiedSparkVersion("column mapping mode = id", Some("3.3.1")) {
spark.sql(s"""
|create table delta_cm1 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "id")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm1 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm1") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm1 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
withTable("delta_cm1") {
spark.sql(s"""
|create table delta_cm1 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "id")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm1 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm1") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm1 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}
}

// NameMapping is supported in Delta 2.0 (related to Spark3.2.0)
testWithSpecifiedSparkVersion("column mapping mode = name", Some("3.2.0")) {
spark.sql(s"""
|create table delta_cm2 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "name")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm2 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm2") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm2 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
withTable("delta_cm2") {
spark.sql(s"""
|create table delta_cm2 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "name")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm2 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm2") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm2 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}
}

test("delta: time travel") {
spark.sql(s"""
|create table delta_tm (id int, name string) using delta
|""".stripMargin)
spark.sql(s"""
|insert into delta_tm values (1, "v1"), (2, "v2")
|""".stripMargin)
spark.sql(s"""
|insert into delta_tm values (3, "v3"), (4, "v4")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_tm VERSION AS OF 1") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)
val df2 = runQueryAndCompare("select * from delta_tm VERSION AS OF 2") { _ => }
checkLengthAndPlan(df2, 4)
checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil)
val df3 = runQueryAndCompare("select name from delta_tm VERSION AS OF 2 where id = 2") { _ => }
checkLengthAndPlan(df3, 1)
checkAnswer(df3, Row("v2") :: Nil)
withTable("delta_tm") {
spark.sql(s"""
|create table delta_tm (id int, name string) using delta
|""".stripMargin)
spark.sql(s"""
|insert into delta_tm values (1, "v1"), (2, "v2")
|""".stripMargin)
spark.sql(s"""
|insert into delta_tm values (3, "v3"), (4, "v4")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_tm VERSION AS OF 1") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)
val df2 = runQueryAndCompare("select * from delta_tm VERSION AS OF 2") { _ => }
checkLengthAndPlan(df2, 4)
checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil)
val df3 = runQueryAndCompare("select name from delta_tm VERSION AS OF 2 where id = 2") {
_ =>
}
checkLengthAndPlan(df3, 1)
checkAnswer(df3, Row("v2") :: Nil)
}
}

test("delta: partition filters") {
spark.sql(s"""
|create table delta_pf (id int, name string) using delta partitioned by (name)
|""".stripMargin)
spark.sql(s"""
|insert into delta_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_pf where name = 'v1'") { _ => }
val deltaScanTransformer = df1.queryExecution.executedPlan.collect {
case f: FileSourceScanExecTransformer => f
}.head
// No data filters as only partition filters exist
assert(deltaScanTransformer.filterExprs().size == 0)
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
}

test("basic test with stats.skipping disabled") {
withSQLConf("spark.databricks.delta.stats.skipping" -> "false") {
withTable("delta_pf") {
spark.sql(s"""
|create table delta_test2 (id int, name string) using delta
|create table delta_pf (id int, name string) using delta partitioned by (name)
|""".stripMargin)
spark.sql(s"""
|insert into delta_test2 values (1, "v1"), (2, "v2")
|insert into delta_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_test2") { _ => }
val df1 = runQueryAndCompare("select * from delta_pf where name = 'v1'") { _ => }
val deltaScanTransformer = df1.queryExecution.executedPlan.collect {
case f: FileSourceScanExecTransformer => f
}.head
// No data filters as only partition filters exist
assert(deltaScanTransformer.filterExprs().size == 0)
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)
checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
}
}

val df2 = runQueryAndCompare("select name from delta_test2 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
test("basic test with stats.skipping disabled") {
withTable("delta_test2") {
withSQLConf("spark.databricks.delta.stats.skipping" -> "false") {
spark.sql(s"""
|create table delta_test2 (id int, name string) using delta
|""".stripMargin)
spark.sql(s"""
|insert into delta_test2 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_test2") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_test2 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}
}
}

Expand Down

0 comments on commit c854b78

Please sign in to comment.