Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] minor change for delta ut #4869

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,93 +44,105 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {

// IdMapping is supported in Delta 2.2 (related to Spark3.3.1)
testWithSpecifiedSparkVersion("column mapping mode = id", Some("3.3.1")) {
spark.sql(s"""
|create table delta_cm1 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "id")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm1 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm1") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm1 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
withTable("delta_cm1") {
spark.sql(s"""
|create table delta_cm1 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "id")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm1 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm1") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm1 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}
}

// NameMapping is supported in Delta 2.0 (related to Spark3.2.0)
testWithSpecifiedSparkVersion("column mapping mode = name", Some("3.2.0")) {
spark.sql(s"""
|create table delta_cm2 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "name")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm2 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm2") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm2 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
withTable("delta_cm2") {
spark.sql(s"""
|create table delta_cm2 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "name")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm2 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm2") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm2 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}
}

test("delta: time travel") {
spark.sql(s"""
|create table delta_tm (id int, name string) using delta
|""".stripMargin)
spark.sql(s"""
|insert into delta_tm values (1, "v1"), (2, "v2")
|""".stripMargin)
spark.sql(s"""
|insert into delta_tm values (3, "v3"), (4, "v4")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_tm VERSION AS OF 1") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)
val df2 = runQueryAndCompare("select * from delta_tm VERSION AS OF 2") { _ => }
checkLengthAndPlan(df2, 4)
checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil)
val df3 = runQueryAndCompare("select name from delta_tm VERSION AS OF 2 where id = 2") { _ => }
checkLengthAndPlan(df3, 1)
checkAnswer(df3, Row("v2") :: Nil)
withTable("delta_tm") {
spark.sql(s"""
|create table delta_tm (id int, name string) using delta
|""".stripMargin)
spark.sql(s"""
|insert into delta_tm values (1, "v1"), (2, "v2")
|""".stripMargin)
spark.sql(s"""
|insert into delta_tm values (3, "v3"), (4, "v4")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_tm VERSION AS OF 1") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)
val df2 = runQueryAndCompare("select * from delta_tm VERSION AS OF 2") { _ => }
checkLengthAndPlan(df2, 4)
checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil)
val df3 = runQueryAndCompare("select name from delta_tm VERSION AS OF 2 where id = 2") {
_ =>
}
checkLengthAndPlan(df3, 1)
checkAnswer(df3, Row("v2") :: Nil)
}
}

test("delta: partition filters") {
spark.sql(s"""
|create table delta_pf (id int, name string) using delta partitioned by (name)
|""".stripMargin)
spark.sql(s"""
|insert into delta_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_pf where name = 'v1'") { _ => }
val deltaScanTransformer = df1.queryExecution.executedPlan.collect {
case f: FileSourceScanExecTransformer => f
}.head
// No data filters as only partition filters exist
assert(deltaScanTransformer.filterExprs().size == 0)
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
}

test("basic test with stats.skipping disabled") {
withSQLConf("spark.databricks.delta.stats.skipping" -> "false") {
withTable("delta_pf") {
spark.sql(s"""
|create table delta_test2 (id int, name string) using delta
|create table delta_pf (id int, name string) using delta partitioned by (name)
|""".stripMargin)
spark.sql(s"""
|insert into delta_test2 values (1, "v1"), (2, "v2")
|insert into delta_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_test2") { _ => }
val df1 = runQueryAndCompare("select * from delta_pf where name = 'v1'") { _ => }
val deltaScanTransformer = df1.queryExecution.executedPlan.collect {
case f: FileSourceScanExecTransformer => f
}.head
// No data filters as only partition filters exist
assert(deltaScanTransformer.filterExprs().size == 0)
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)
checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
}
}

val df2 = runQueryAndCompare("select name from delta_test2 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
test("basic test with stats.skipping disabled") {
withTable("delta_test2") {
withSQLConf("spark.databricks.delta.stats.skipping" -> "false") {
spark.sql(s"""
|create table delta_test2 (id int, name string) using delta
|""".stripMargin)
spark.sql(s"""
|insert into delta_test2 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_test2") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_test2 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}
}
}

Expand Down
Loading