Skip to content

Commit

Permalink
[Spark] Add INSERT tests with missing, extra, reordered columns/fields (
Browse files Browse the repository at this point in the history
#3762)

## Description
Follow on #3605

Adds more tests covering behavior for all ways of running insert with:
- an extra column or struct field in the input, in
`DeltaInsertIntoSchemaEvolutionSuite`
- a missing column or struct field in the input, in
`DeltaInsertIntoImplicitCastSuite`
- a different column or field ordering than the table schema, in
`DeltaInsertIntoColumnOrderSuite`

Note: tests are spread across multiple suites as each test case covers
20 different ways to run inserts, quickly leading to large test suites.

This change includes improvements to `DeltaInsertIntoTest`:
- Group all types of inserts into categories that are easier to
reference in tests:
  - SQL vs. Dataframe inserts
  - Position-based vs. name-based inserts
  - Append vs. overwrite
- Provide a mechanism to ensure that each test covers all existing
insert types.

## How was this patch tested?
N/A: test only
  • Loading branch information
johanl-db authored Oct 28, 2024
1 parent 223894f commit 6599b40
Show file tree
Hide file tree
Showing 8 changed files with 874 additions and 207 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.internal.SQLConf

/**
* Test suite covering INSERT operations with columns or struct fields ordered differently than in
* the table schema.
*/
class DeltaInsertIntoColumnOrderSuite extends DeltaInsertIntoTest {

override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(DeltaSQLConf.DELTA_STREAMING_SINK_ALLOW_IMPLICIT_CASTS.key, "false")
spark.conf.set(SQLConf.ANSI_ENABLED.key, "true")
}

test("all test cases are implemented") {
checkAllTestCasesImplemented()
}

// Inserting using a different ordering for top-level columns behaves as one would expect:
// inserts by position resolve columns based on position, inserts by name resolve based on name.
// Whether additional handling is required to add implicit casts doesn't impact this behavior.
for { (inserts, expectedAnswer) <- Seq(
insertsByPosition.intersect(insertsAppend) ->
TestData("a int, b int, c int",
Seq("""{ "a": 1, "b": 2, "c": 3 }""", """{ "a": 1, "b": 4, "c": 5 }""")),
insertsByPosition.intersect(insertsOverwrite) ->
TestData("a int, b int, c int", Seq("""{ "a": 1, "b": 4, "c": 5 }""")),
insertsByName.intersect(insertsAppend) ->
TestData("a int, b int, c int",
Seq("""{ "a": 1, "b": 2, "c": 3 }""", """{ "a": 1, "b": 5, "c": 4 }""")),
insertsByName.intersect(insertsOverwrite) ->
TestData("a int, b int, c int", Seq("""{ "a": 1, "b": 5, "c": 4 }"""))
)
} {
testInserts(s"insert with different top-level column ordering")(
initialData = TestData("a int, b int, c int", Seq("""{ "a": 1, "b": 2, "c": 3 }""")),
partitionBy = Seq("a"),
overwriteWhere = "a" -> 1,
insertData = TestData("a int, c int, b int", Seq("""{ "a": 1, "c": 4, "b": 5 }""")),
expectedResult = ExpectedResult.Success(expectedAnswer),
includeInserts = inserts
)

testInserts(s"insert with implicit cast and different top-level column ordering")(
initialData = TestData("a int, b int, c int", Seq("""{ "a": 1, "b": 2, "c": 3 }""")),
partitionBy = Seq("a"),
overwriteWhere = "a" -> 1,
insertData = TestData("a long, c int, b int", Seq("""{ "a": 1, "c": 4, "b": 5 }""")),
expectedResult = ExpectedResult.Success(expectedAnswer),
// Dataframe insert by name don't support implicit cast, see negative test below.
includeInserts = inserts -- insertsByName.intersect(insertsDataframe)
)
}

testInserts(s"insert with implicit cast and different top-level column ordering")(
initialData = TestData("a int, b int, c int", Seq("""{ "a": 1, "b": 2, "c": 3 }""")),
partitionBy = Seq("a"),
overwriteWhere = "a" -> 1,
insertData = TestData("a long, c int, b int", Seq("""{ "a": 1, "c": 4, "b": 4 }""")),
expectedResult = ExpectedResult.Failure(ex => {
checkError(
ex,
"DELTA_FAILED_TO_MERGE_FIELDS",
parameters = Map(
"currentField" -> "a",
"updateField" -> "a"
))}),
includeInserts = insertsByName.intersect(insertsDataframe)
)

// Inserting using a different ordering for struct fields is full of surprises...
for { (inserts: Set[Insert], expectedAnswer) <- Seq(
// Most inserts use name based resolution for struct fields when there's no implicit cast
// required due to mismatching data types, except for `INSERT INTO/OVERWRITE (columns)` and
// `INSERT OVERWRITE PARTITION (partition) (columns)` which use position based resolution - even
// though these are by name inserts.
insertsAppend -
SQLInsertColList(SaveMode.Append) ->
TestData("a int, s struct <x int, y: int>",
Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""", """{ "a": 1, "s": { "x": 4, "y": 5 } }""")),
insertsOverwrite -
SQLInsertColList(SaveMode.Overwrite) - SQLInsertOverwritePartitionColList ->
TestData("a int, s struct <x int, y: int>", Seq("""{ "a": 1, "s": { "x": 4, "y": 5 } }""")),
Set(SQLInsertColList(SaveMode.Append)) ->
TestData("a int, s struct <x int, y: int>",
Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""", """{ "a": 1, "s": { "x": 5, "y": 4 } }""")),
Set(SQLInsertColList(SaveMode.Overwrite), SQLInsertOverwritePartitionColList) ->
TestData("a int, s struct <x int, y: int>", Seq("""{ "a": 1, "s": { "x": 5, "y": 4 } }"""))
)
} {
testInserts(s"insert with different struct fields ordering")(
initialData = TestData(
"a int, s struct <x: int, y int>",
Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""")),
partitionBy = Seq("a"),
overwriteWhere = "a" -> 1,
insertData = TestData("a int, s struct <y int, x: int>",
Seq("""{ "a": 1, "s": { "y": 5, "x": 4 } }""")),
expectedResult = ExpectedResult.Success(expectedAnswer),
includeInserts = inserts
)
}

for { (inserts: Set[Insert], expectedAnswer) <- Seq(
// When there's a type mismatch and an implicit cast is required, then all inserts use position
// based resolution for struct fields, except for `INSERT OVERWRITE PARTITION (partition)` which
// uses name based resolution, and dataframe inserts by name which don't support implicit cast
// and fail - see negative test below.
insertsAppend - StreamingInsert ->
TestData("a int, s struct <x int, y: int>",
Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""", """{ "a": 1, "s": { "x": 5, "y": 4 } }""")),
insertsOverwrite - SQLInsertOverwritePartitionByPosition ->
TestData("a int, s struct <x int, y: int>", Seq("""{ "a": 1, "s": { "x": 5, "y": 4 } }""")),
Set(SQLInsertOverwritePartitionByPosition) ->
TestData("a int, s struct <x int, y: int>", Seq("""{ "a": 1, "s": { "x": 4, "y": 5 } }"""))
)
} {
testInserts(s"insert with implicit cast and different struct fields ordering")(
initialData = TestData(
"a int, s struct <x: int, y int>",
Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""")),
partitionBy = Seq("a"),
overwriteWhere = "a" -> 1,
insertData = TestData("a long, s struct <y int, x: int>",
Seq("""{ "a": 1, "s": { "y": 5, "x": 4 } }""")),
expectedResult = ExpectedResult.Success(expectedAnswer),
includeInserts = inserts -- insertsDataframe.intersect(insertsByName)
)
}

testInserts(s"insert with implicit cast and different struct fields ordering")(
initialData = TestData(
"a int, s struct <x: int, y int>",
Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""")),
partitionBy = Seq("a"),
overwriteWhere = "a" -> 1,
insertData = TestData("a long, s struct <y int, x: int>",
Seq("""{ "a": 1, "s": { "y": 5, "x": 4 } }""")),
expectedResult = ExpectedResult.Failure(ex => {
checkError(
ex,
"DELTA_FAILED_TO_MERGE_FIELDS",
parameters = Map(
"currentField" -> "a",
"updateField" -> "a"
))}),
includeInserts = insertsDataframe.intersect(insertsByName)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.SaveMode

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand All @@ -38,42 +38,34 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest {
spark.conf.set(SQLConf.ANSI_ENABLED.key, "true")
}

test("all test cases are implemented") {
checkAllTestCasesImplemented()
}

for (schemaEvolution <- BOOLEAN_DOMAIN) {
testInserts("insert with implicit up and down cast on top-level fields, " +
s"schemaEvolution=$schemaEvolution")(
initialSchemaDDL = "a long, b int",
initialJsonData = Seq("""{ "a": 1, "b": 2 }"""),
initialData = TestData("a long, b int", Seq("""{ "a": 1, "b": 2 }""")),
partitionBy = Seq("a"),
overwriteWhere = "a" -> 1,
insertSchemaDDL = "a int, b long",
insertJsonData = Seq("""{ "a": 1, "b": 4 }"""),
insertData = TestData("a int, b long", Seq("""{ "a": 1, "b": 4 }""")),
expectedResult = ExpectedResult.Success(
expected = new StructType()
.add("a", LongType)
.add("b", IntegerType)),
// The following insert operations don't implicitly cast the data but fail instead - see
// following test covering failure for these cases. We should change this to offer consistent
// behavior across all inserts.
excludeInserts = Seq(
DFv1SaveAsTable(SaveMode.Append),
DFv1SaveAsTable(SaveMode.Overwrite),
DFv1Save(SaveMode.Append),
DFv1Save(SaveMode.Overwrite),
DFv2Append,
DFv2Overwrite,
DFv2OverwritePartition
),
excludeInserts = insertsDataframe.intersect(insertsByName) - StreamingInsert,
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)

testInserts("insert with implicit up and down cast on top-level fields, " +
s"schemaEvolution=$schemaEvolution")(
initialSchemaDDL = "a long, b int",
initialJsonData = Seq("""{ "a": 1, "b": 2 }"""),
initialData = TestData("a long, b int", Seq("""{ "a": 1, "b": 2 }""")),
partitionBy = Seq("a"),
overwriteWhere = "a" -> 1,
insertSchemaDDL = "a int, b long",
insertJsonData = Seq("""{ "a": 1, "b": 4 }"""),
insertData = TestData("a int, b long", Seq("""{ "a": 1, "b": 4 }""")),
expectedResult = ExpectedResult.Failure { ex =>
checkError(
ex,
Expand All @@ -83,26 +75,18 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest {
"updateField" -> "a"
))
},
includeInserts = Seq(
DFv1SaveAsTable(SaveMode.Append),
DFv1SaveAsTable(SaveMode.Overwrite),
DFv1Save(SaveMode.Append),
DFv1Save(SaveMode.Overwrite),
DFv2Append,
DFv2Overwrite,
DFv2OverwritePartition
),
includeInserts = insertsDataframe.intersect(insertsByName) - StreamingInsert,
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)

testInserts("insert with implicit up and down cast on fields nested in array, " +
s"schemaEvolution=$schemaEvolution")(
initialSchemaDDL = "key int, a array<struct<x: long, y: int>>",
initialJsonData = Seq("""{ "key": 1, "a": [ { "x": 1, "y": 2 } ] }"""),
initialData = TestData("key int, a array<struct<x: long, y: int>>",
Seq("""{ "key": 1, "a": [ { "x": 1, "y": 2 } ] }""")),
partitionBy = Seq("key"),
overwriteWhere = "key" -> 1,
insertSchemaDDL = "key int, a array<struct<x: int, y: long>>",
insertJsonData = Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }"""),
insertData = TestData("key int, a array<struct<x: int, y: long>>",
Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }""")),
expectedResult = ExpectedResult.Success(
expected = new StructType()
.add("key", IntegerType)
Expand All @@ -112,26 +96,18 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest {
// The following insert operations don't implicitly cast the data but fail instead - see
// following test covering failure for these cases. We should change this to offer consistent
// behavior across all inserts.
excludeInserts = Seq(
DFv1SaveAsTable(SaveMode.Append),
DFv1SaveAsTable(SaveMode.Overwrite),
DFv1Save(SaveMode.Append),
DFv1Save(SaveMode.Overwrite),
DFv2Append,
DFv2Overwrite,
DFv2OverwritePartition
),
excludeInserts = insertsDataframe.intersect(insertsByName) - StreamingInsert,
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)

testInserts("insert with implicit up and down cast on fields nested in array, " +
s"schemaEvolution=$schemaEvolution")(
initialSchemaDDL = "key int, a array<struct<x: long, y: int>>",
initialJsonData = Seq("""{ "key": 1, "a": [ { "x": 1, "y": 2 } ] }"""),
initialData = TestData("key int, a array<struct<x: long, y: int>>",
Seq("""{ "key": 1, "a": [ { "x": 1, "y": 2 } ] }""")),
partitionBy = Seq("key"),
overwriteWhere = "key" -> 1,
insertSchemaDDL = "key int, a array<struct<x: int, y: long>>",
insertJsonData = Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }"""),
insertData = TestData("key int, a array<struct<x: int, y: long>>",
Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }""")),
expectedResult = ExpectedResult.Failure { ex =>
checkError(
ex,
Expand All @@ -141,26 +117,18 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest {
"updateField" -> "a"
))
},
includeInserts = Seq(
DFv1SaveAsTable(SaveMode.Append),
DFv1SaveAsTable(SaveMode.Overwrite),
DFv1Save(SaveMode.Append),
DFv1Save(SaveMode.Overwrite),
DFv2Append,
DFv2Overwrite,
DFv2OverwritePartition
),
includeInserts = insertsDataframe.intersect(insertsByName) - StreamingInsert,
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)

testInserts("insert with implicit up and down cast on fields nested in map, " +
s"schemaEvolution=$schemaEvolution")(
initialSchemaDDL = "key int, m map<string, struct<x: long, y: int>>",
initialJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 1, "y": 2 } } }"""),
initialData = TestData("key int, m map<string, struct<x: long, y: int>>",
Seq("""{ "key": 1, "m": { "a": { "x": 1, "y": 2 } } }""")),
partitionBy = Seq("key"),
overwriteWhere = "key" -> 1,
insertSchemaDDL = "key int, m map<string, struct<x: int, y: long>>",
insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""),
insertData = TestData("key int, m map<string, struct<x: int, y: long>>",
Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }""")),
expectedResult = ExpectedResult.Success(
expected = new StructType()
.add("key", IntegerType)
Expand All @@ -170,26 +138,18 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest {
// The following insert operations don't implicitly cast the data but fail instead - see
// following test covering failure for these cases. We should change this to offer consistent
// behavior across all inserts.
excludeInserts = Seq(
DFv1SaveAsTable(SaveMode.Append),
DFv1SaveAsTable(SaveMode.Overwrite),
DFv1Save(SaveMode.Append),
DFv1Save(SaveMode.Overwrite),
DFv2Append,
DFv2Overwrite,
DFv2OverwritePartition
),
excludeInserts = insertsDataframe.intersect(insertsByName) - StreamingInsert,
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)

testInserts("insert with implicit up and down cast on fields nested in map, " +
s"schemaEvolution=$schemaEvolution")(
initialSchemaDDL = "key int, m map<string, struct<x: long, y: int>>",
initialJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 1, "y": 2 } } }"""),
initialData = TestData("key int, m map<string, struct<x: long, y: int>>",
Seq("""{ "key": 1, "m": { "a": { "x": 1, "y": 2 } } }""")),
partitionBy = Seq("key"),
overwriteWhere = "key" -> 1,
insertSchemaDDL = "key int, m map<string, struct<x: int, y: long>>",
insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""),
insertData = TestData("key int, m map<string, struct<x: int, y: long>>",
Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }""")),
expectedResult = ExpectedResult.Failure { ex =>
checkError(
ex,
Expand All @@ -199,15 +159,7 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest {
"updateField" -> "m"
))
},
includeInserts = Seq(
DFv1SaveAsTable(SaveMode.Append),
DFv1SaveAsTable(SaveMode.Overwrite),
DFv1Save(SaveMode.Append),
DFv1Save(SaveMode.Overwrite),
DFv2Append,
DFv2Overwrite,
DFv2OverwritePartition
),
includeInserts = insertsDataframe.intersect(insertsByName) - StreamingInsert,
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)
}
Expand Down
Loading

0 comments on commit 6599b40

Please sign in to comment.