Skip to content

Commit

Permalink
Improve exception in resolveQueryColumnsByName
Browse files Browse the repository at this point in the history
  • Loading branch information
c27kwan committed Jan 22, 2024
1 parent a936597 commit 2b91316
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -872,17 +872,19 @@ class DeltaAnalysis(session: SparkSession)
private def resolveQueryColumnsByName(
query: LogicalPlan, targetAttrs: Seq[Attribute], deltaTable: DeltaTableV2): LogicalPlan = {
insertIntoByNameMissingColumn(query, targetAttrs, deltaTable)
// Spark will resolve columns to make sure specified columns are in the table schema and don't
// have duplicates. This is just a sanity check.
assert(
query.output.length <= targetAttrs.length,
s"Too many specified columns ${query.output.map(_.name).mkString(", ")}. " +
s"Table columns: ${targetAttrs.map(_.name).mkString(", ")}")

// This is called before resolveOutputColumns in postHocResolutionRules, so we need to duplicate
// the schema validation here.
if (query.output.length > targetAttrs.length) {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
tableName = deltaTable.name(),
expected = targetAttrs.map(_.name),
queryOutput = query.output)
}

val project = query.output.map { attr =>
val targetAttr = targetAttrs.find(t => session.sessionState.conf.resolver(t.name, attr.name))
.getOrElse {
// This is a sanity check. Spark should have done the check.
throw DeltaErrors.missingColumn(attr, targetAttrs)
}
addCastToColumn(attr, targetAttr, deltaTable.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,35 @@ class DeltaInsertIntoSQLSuite
}
}

test("insertInto should throw an AnalysisError on name mismatch") {
def testInsertByNameError(targetSchema: String, expectedErrorClass: String): Unit = {
val sourceTableName = "source"
val targetTableName = "target"
val format = "delta"
withTable(sourceTableName, targetTableName) {
sql(s"CREATE TABLE $sourceTableName (a int, b int) USING $format")
sql(s"CREATE TABLE $targetTableName $targetSchema USING $format")
val e = intercept[AnalysisException] {
sql(s"INSERT INTO $targetTableName BY NAME SELECT * FROM $sourceTableName")
}
assert(e.getErrorClass === expectedErrorClass)
}
}

// NOTE: We use upper case in the target schema so that needsSchemaAdjustmentByName returns
// true (due to case sensitivity) so that we call resolveQueryColumnsByName and hit the right
// code path.

// when the number of columns does not match, throw an arity mismatch error.
testInsertByNameError(
targetSchema = "(A int)",
expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS")

// when the number of columns matches, but the names do not, throw a missing column error.
testInsertByNameError(
targetSchema = "(A int, c int)", expectedErrorClass = "DELTA_MISSING_COLUMN")
}

dynamicOverwriteTest("insertInto: dynamic overwrite by name") {
import testImplicits._
val t1 = "tbl"
Expand Down

0 comments on commit 2b91316

Please sign in to comment.