diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 07a9b0945c97..5b4e41bb60cc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -22,17 +22,20 @@ import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.util.ConfigUtils -import org.apache.hudi.exception.HoodieException +import org.apache.hudi.exception.{HoodieException, HoodieValidationException} import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils +import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.{SPARK_VERSION, SparkConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps +import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog +import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.validateTableSchema import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD import org.apache.spark.sql.types.StructType @@ -76,6 +79,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean } try { + validateTableSchema(table.schema, hoodieCatalogTable.tableSchemaWithoutMetaFields) // create catalog table for this hoodie table CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists, queryAsProp) } catch { @@ -88,6 +92,33 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean object CreateHoodieTableCommand { + def validateTableSchema(userDefinedSchema: StructType, hoodieTableSchema: StructType): Boolean = { + if (userDefinedSchema.fields.length != 0 && + userDefinedSchema.fields.length != hoodieTableSchema.fields.length) { + false + } else if (userDefinedSchema.fields.length != 0) { + val sortedHoodieTableFields = hoodieTableSchema.fields.sortBy(_.name) + val sortedUserDefinedFields = userDefinedSchema.fields.sortBy(_.name) + val diffResult = sortedHoodieTableFields.zip(sortedUserDefinedFields).forall { + case (hoodieTableColumn, userDefinedColumn) => + hoodieTableColumn.name.equals(userDefinedColumn.name) && + (Cast.canCast(hoodieTableColumn.dataType, userDefinedColumn.dataType) || + SchemaConverters.toAvroType(hoodieTableColumn.dataType) + .equals(SchemaConverters.toAvroType(userDefinedColumn.dataType))) + } + if (!diffResult) { + throw new HoodieValidationException( + s"The defined schema is inconsistent with the schema in the hoodie metadata directory," + + s" hoodieTableSchema: ${hoodieTableSchema.simpleString}," + + s" userDefinedSchema: ${userDefinedSchema.simpleString}") + } else { + true + } + } else { + true + } + } + def validateTblProperties(hoodieCatalogTable: HoodieCatalogTable): Unit = { if (hoodieCatalogTable.hoodieTableExists) { val originTableConfig = hoodieCatalogTable.tableConfig.getProps.asScala.toMap diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala index cb558b66b94a..1d65002e3743 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory import java.io.File import java.util.TimeZone +import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { @@ -70,7 +71,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { .config(sparkConf()) .getOrCreate() - private var tableId = 0 + private var tableId = new AtomicInteger(0) private var extraConf = Map[String, String]() @@ -113,9 +114,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { } protected def generateTableName: String = { - val name = s"h$tableId" - tableId = tableId + 1 - name + s"h${tableId.incrementAndGet()}" } override protected def afterAll(): Unit = { @@ -215,7 +214,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { try { f(tableName) } finally { - spark.sql(s"drop table if exists $tableName") + spark.sql(s"drop table if exists $tableName purge") } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala index 28c633ee95bf..12c0d6483380 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala @@ -1513,4 +1513,46 @@ class TestCreateTable extends HoodieSparkSqlTestBase { ) } } + + test("Test Create Table In Inconsistent Schemes") { + withTempDir { tmp => + val parentPath = tmp.getCanonicalPath + + // test the case that create same table after change schema + val tableName1 = generateTableName + spark.sql( + s""" + |create table $tableName1 ( + | id int, + | name string, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = 'cow' + | ) + | location '$parentPath/$tableName1' + """.stripMargin) + spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)") + spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)") + spark.sql(s"drop table $tableName1") + + checkExceptionContain( + s""" + |create table $tableName1 ( + | id int, + | name map, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = 'cow' + | ) + | location '$parentPath/$tableName1' + """.stripMargin + )("Failed to create catalog table in metastore") + } + } }