diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 940c16d21a..c882fd5fe0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -872,17 +872,19 @@ class DeltaAnalysis(session: SparkSession) private def resolveQueryColumnsByName( query: LogicalPlan, targetAttrs: Seq[Attribute], deltaTable: DeltaTableV2): LogicalPlan = { insertIntoByNameMissingColumn(query, targetAttrs, deltaTable) - // Spark will resolve columns to make sure specified columns are in the table schema and don't - // have duplicates. This is just a sanity check. - assert( - query.output.length <= targetAttrs.length, - s"Too many specified columns ${query.output.map(_.name).mkString(", ")}. " + - s"Table columns: ${targetAttrs.map(_.name).mkString(", ")}") + + // This is called before resolveOutputColumns in postHocResolutionRules, so we need to duplicate + // the schema validation here. + if (query.output.length > targetAttrs.length) { + throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( + tableName = deltaTable.name(), + expected = targetAttrs.map(_.name), + queryOutput = query.output) + } val project = query.output.map { attr => val targetAttr = targetAttrs.find(t => session.sessionState.conf.resolver(t.name, attr.name)) .getOrElse { - // This is a sanity check. Spark should have done the check. throw DeltaErrors.missingColumn(attr, targetAttrs) } addCastToColumn(attr, targetAttr, deltaTable.name()) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala index deacbd1f50..c98fc35fac 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala @@ -134,6 +134,35 @@ class DeltaInsertIntoSQLSuite } } + test("insertInto should throw an AnalysisError on name mismatch") { + def testInsertByNameError(targetSchema: String, expectedErrorClass: String): Unit = { + val sourceTableName = "source" + val targetTableName = "target" + val format = "delta" + withTable(sourceTableName, targetTableName) { + sql(s"CREATE TABLE $sourceTableName (a int, b int) USING $format") + sql(s"CREATE TABLE $targetTableName $targetSchema USING $format") + val e = intercept[AnalysisException] { + sql(s"INSERT INTO $targetTableName BY NAME SELECT * FROM $sourceTableName") + } + assert(e.getErrorClass === expectedErrorClass) + } + } + + // NOTE: We use upper case in the target schema so that needsSchemaAdjustmentByName returns + // true (due to case sensitivity) so that we call resolveQueryColumnsByName and hit the right + // code path. + + // when the number of columns does not match, throw an arity mismatch error. + testInsertByNameError( + targetSchema = "(A int)", + expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS") + + // when the number of columns matches, but the names do not, throw a missing column error. + testInsertByNameError( + targetSchema = "(A int, c int)", expectedErrorClass = "DELTA_MISSING_COLUMN") + } + dynamicOverwriteTest("insertInto: dynamic overwrite by name") { import testImplicits._ val t1 = "tbl"