Skip to content

Commit

Permalink
[HUDI-8160] Verify the consistency of the user-defined schema and the…
Browse files Browse the repository at this point in the history
… existing hoodie scheme when creating the hoodie table (#11869)
  • Loading branch information
huangxiaopingRD committed Sep 30, 2024
1 parent 7bbe1a4 commit 4e98278
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.exception.{HoodieException, HoodieValidationException}
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.validateTableSchema
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -76,6 +79,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
}

try {
validateTableSchema(table.schema, hoodieCatalogTable.tableSchemaWithoutMetaFields)
// create catalog table for this hoodie table
CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists, queryAsProp)
} catch {
Expand All @@ -88,6 +92,33 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean

object CreateHoodieTableCommand {

def validateTableSchema(userDefinedSchema: StructType, hoodieTableSchema: StructType): Boolean = {
if (userDefinedSchema.fields.length != 0 &&
userDefinedSchema.fields.length != hoodieTableSchema.fields.length) {
false
} else if (userDefinedSchema.fields.length != 0) {
val sortedHoodieTableFields = hoodieTableSchema.fields.sortBy(_.name)
val sortedUserDefinedFields = userDefinedSchema.fields.sortBy(_.name)
val diffResult = sortedHoodieTableFields.zip(sortedUserDefinedFields).forall {
case (hoodieTableColumn, userDefinedColumn) =>
hoodieTableColumn.name.equals(userDefinedColumn.name) &&
(Cast.canCast(hoodieTableColumn.dataType, userDefinedColumn.dataType) ||
SchemaConverters.toAvroType(hoodieTableColumn.dataType)
.equals(SchemaConverters.toAvroType(userDefinedColumn.dataType)))
}
if (!diffResult) {
throw new HoodieValidationException(
s"The defined schema is inconsistent with the schema in the hoodie metadata directory," +
s" hoodieTableSchema: ${hoodieTableSchema.simpleString}," +
s" userDefinedSchema: ${userDefinedSchema.simpleString}")
} else {
true
}
} else {
true
}
}

def validateTblProperties(hoodieCatalogTable: HoodieCatalogTable): Unit = {
if (hoodieCatalogTable.hoodieTableExists) {
val originTableConfig = hoodieCatalogTable.tableConfig.getProps.asScala.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory

import java.io.File
import java.util.TimeZone
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern

class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
Expand Down Expand Up @@ -70,7 +71,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
.config(sparkConf())
.getOrCreate()

private var tableId = 0
private var tableId = new AtomicInteger(0)

private var extraConf = Map[String, String]()

Expand Down Expand Up @@ -113,9 +114,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
}

protected def generateTableName: String = {
val name = s"h$tableId"
tableId = tableId + 1
name
s"h${tableId.incrementAndGet()}"
}

override protected def afterAll(): Unit = {
Expand Down Expand Up @@ -215,7 +214,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
try {
f(tableName)
} finally {
spark.sql(s"drop table if exists $tableName")
spark.sql(s"drop table if exists $tableName purge")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1513,4 +1513,46 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
)
}
}

test("Test Create Table In Inconsistent Schemes") {
withTempDir { tmp =>
val parentPath = tmp.getCanonicalPath

// test the case that create same table after change schema
val tableName1 = generateTableName
spark.sql(
s"""
|create table $tableName1 (
| id int,
| name string,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'cow'
| )
| location '$parentPath/$tableName1'
""".stripMargin)
spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")
spark.sql(s"drop table $tableName1")

checkExceptionContain(
s"""
|create table $tableName1 (
| id int,
| name map<string, string>,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'cow'
| )
| location '$parentPath/$tableName1'
""".stripMargin
)("Failed to create catalog table in metastore")
}
}
}

0 comments on commit 4e98278

Please sign in to comment.