Skip to content

Commit

Permalink
[HUDI-8025] Add tests for index updates with table services
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Sep 30, 2024
1 parent 4e98278 commit 3a488a4
Show file tree
Hide file tree
Showing 2 changed files with 310 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.model.{FileSlice, HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieCommitMetadata, HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.{HoodieCleanConfig, HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieWriteConflictException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.util.JFunction
import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieMetadataFileSystemView, MetadataPartitionType}
import org.apache.hudi.util.{JFunction, JavaConversions}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
Expand Down Expand Up @@ -224,6 +226,126 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase {
assertEquals(67, snapshot0.count())
}

/**
* Test case to do updates and then validate partition stats with cleaning.
*/
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
def testPartitionStatsWithCompactionAndCleaning(tableType: HoodieTableType): Unit = {
var hudiOpts = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1")
if (tableType == HoodieTableType.MERGE_ON_READ) {
hudiOpts = hudiOpts ++ Map(
HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "15"
)
}
// insert followed by two upserts (trigger a compaction so that prev version can be cleaned)
doWriteAndValidateDataAndPartitionStats(
hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite)
doWriteAndValidateDataAndPartitionStats(
hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
doWriteAndValidateDataAndPartitionStats(
hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
// Clean Operation
val lastCleanInstant = getLatestMetaClient(false).getActiveTimeline.getCleanerTimeline.lastInstant()
assertTrue(lastCleanInstant.isPresent)
// do another upsert and validate the partition stats including file pruning
doWriteAndValidateDataAndPartitionStats(
hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)

validateDataAndPartitionStats()
createTempTable(hudiOpts)
verifyQueryPredicate(hudiOpts)
}

/**
* Test case to do updates and then validate partition stats with clustering.
*/
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
def testPartitionStatsWithClustering(tableType: HoodieTableType): Unit = {
val hudiOpts = commonOpts ++ Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2",
KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key() -> "true")

doWriteAndValidateDataAndPartitionStats(
hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite)
doWriteAndValidateDataAndPartitionStats(
hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
// validate clustering instant
val lastClusteringInstant = getLatestClusteringInstant
assertTrue(getLatestClusteringInstant.isPresent)
// do two more rounds of upsert to trigger another clustering
doWriteAndValidateDataAndPartitionStats(hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
doWriteAndValidateDataAndPartitionStats(hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
assertTrue(getLatestClusteringInstant.get().getTimestamp.compareTo(lastClusteringInstant.get().getTimestamp) > 0)
assertEquals(getLatestClusteringInstant, metaClient.getActiveTimeline.lastInstant())
// We are validating rollback of a DT clustering instant here
rollbackLastInstant(hudiOpts)

validateDataAndPartitionStats()
createTempTable(hudiOpts)
verifyQueryPredicate(hudiOpts)
}

/**
* Test case to do updates and then validate partition stats with MDT compaction.
* Any one table type is enough to test this as we are validating the metadata table.
*/
@Test
def testPartitionStatsWithMDTCompaction(): Unit = {
val hudiOpts = commonOpts ++ Map(
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "1"
)
doWriteAndValidateDataAndPartitionStats(
hudiOpts,
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Overwrite)
doWriteAndValidateDataAndPartitionStats(
hudiOpts,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)
// validate MDT compaction instant
val metadataTableFSView = getHoodieTable(metaClient, getWriteConfig(hudiOpts)).getMetadataTable
.asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView
try {
val compactionTimeline = metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants()
val lastCompactionInstant = compactionTimeline
.filter(JavaConversions.getPredicate((instant: HoodieInstant) =>
HoodieCommitMetadata.fromBytes(compactionTimeline.getInstantDetails(instant).get, classOf[HoodieCommitMetadata])
.getOperationType == WriteOperationType.COMPACT))
.lastInstant()
val compactionBaseFile = metadataTableFSView.getAllBaseFiles(MetadataPartitionType.PARTITION_STATS.getPartitionPath)
.filter(JavaConversions.getPredicate((f: HoodieBaseFile) => f.getCommitTime.equals(lastCompactionInstant.get().getTimestamp)))
.findAny()
assertTrue(compactionBaseFile.isPresent)
} finally {
metadataTableFSView.close()
}
}

def verifyQueryPredicate(hudiOpts: Map[String, String]): Unit = {
val reckey = mergedDfList.last.limit(1).collect().map(row => row.getAs("_row_key").toString)
val dataFilter = EqualTo(attribute("_row_key"), Literal(reckey(0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@

package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.{FileSlice, HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode}
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieWriteConflictException
import org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase
import org.apache.hudi.metadata.{HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView, SparkHoodieBackedTableMetadataWriter}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
import org.apache.hudi.util.JFunction
import org.apache.hudi.util.{JFunction, JavaConversions}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex, HoodieSparkUtils}
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
Expand Down Expand Up @@ -486,6 +487,186 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness {
}
}

/**
* Test case to write with updates and validate secondary index with multiple writers.
*/
@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
def testSecondaryIndexPruningWithCleaning(tableType: HoodieTableType): Unit = {
if (HoodieSparkUtils.gteqSpark3_3) {
var hudiOpts = commonOpts
hudiOpts = hudiOpts + (
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1")
if (tableType == HoodieTableType.MERGE_ON_READ) {
hudiOpts = hudiOpts ++ Map(
HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "15"
)
}
val sqlTableType = if (tableType == HoodieTableType.COPY_ON_WRITE) "cow" else "mor"
tableName += "test_secondary_index_pruning_compact_clean_" + sqlTableType

spark.sql(
s"""
|create table $tableName (
| ts bigint,
| record_key_col string,
| not_record_key_col string,
| partition_key_col string
|) using hudi
| options (
| primaryKey ='record_key_col',
| type = '$sqlTableType',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'record_key_col',
| hoodie.enable.data.skipping = 'true',
| hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
| ${HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key} = '1'
| )
| partitioned by(partition_key_col)
| location '$basePath'
""".stripMargin)
// by setting small file limit to 0, each insert will create a new file
// need to generate more file for non-partitioned table to test data skipping
// as the partitioned table will have only one file per partition
spark.sql("set hoodie.parquet.small.file.limit=0")
if (tableType == HoodieTableType.MERGE_ON_READ) {
spark.sql("set hoodie.compact.inline=true")
spark.sql("set hoodie.compact.inline.max.delta.commits=2")
spark.sql("set hoodie.metadata.compact.num.delta.commits=15")
}
spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
// create secondary index
spark.sql(s"create index idx_not_record_key_col on $tableName using secondary_index(not_record_key_col)")
// validate index created successfully
metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from hudi_metadata('$basePath') where type=7")(
Seq("abc", "row1"),
Seq("cde", "row2"),
Seq("def", "row3")
)
// validate data skipping with filters on secondary key column
spark.sql("set hoodie.metadata.enable=true")
spark.sql("set hoodie.enable.data.skipping=true")
spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where not_record_key_col = 'abc'")(
Seq(1, "row1", "abc", "p1")
)
verifyQueryPredicate(hudiOpts, "not_record_key_col")

// update the secondary key column
spark.sql(s"update $tableName set not_record_key_col = 'xyz' where record_key_col = 'row1'")
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.recordKey, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")(
Seq("abc", "row1", true),
Seq("cde", "row2", false),
Seq("def", "row3", false),
Seq("xyz", "row1", false)
)
// validate data and data skipping
checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where record_key_col = 'row1'")(
Seq(1, "row1", "xyz", "p1")
)
verifyQueryPredicate(hudiOpts, "not_record_key_col")
}
}

/**
* Test case to write with updates and validate secondary index with multiple writers.
* Any one table type is enough to test this as we are validating the metadata table.
*/
@Test
def testSecondaryIndexWithMDTCompaction(): Unit = {
if (HoodieSparkUtils.gteqSpark3_3) {
val hudiOpts = commonOpts ++ Map(
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "1"
)
val tableName = "test_secondary_index_with_mdt_compaction"

spark.sql(
s"""
|create table $tableName (
| ts bigint,
| record_key_col string,
| not_record_key_col string,
| partition_key_col string
|) using hudi
| options (
| primaryKey ='record_key_col',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'record_key_col',
| hoodie.enable.data.skipping = 'true',
| hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
| ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '1'
| )
| partitioned by(partition_key_col)
| location '$basePath'
""".stripMargin)
// by setting small file limit to 0, each insert will create a new file
// need to generate more file for non-partitioned table to test data skipping
// as the partitioned table will have only one file per partition
spark.sql("set hoodie.parquet.small.file.limit=0")
spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
// create secondary index
spark.sql(s"create index idx_not_record_key_col on $tableName using secondary_index(not_record_key_col)")
// validate index created successfully
metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))

// do another insert and validate compaction in metadata table
spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
val metadataTableFSView: HoodieMetadataFileSystemView = getTableFileSystemView(hudiOpts)
try {
val compactionTimeline = metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants()
val lastCompactionInstant = compactionTimeline
.filter(JavaConversions.getPredicate((instant: HoodieInstant) =>
HoodieCommitMetadata.fromBytes(compactionTimeline.getInstantDetails(instant).get, classOf[HoodieCommitMetadata])
.getOperationType == WriteOperationType.COMPACT))
.lastInstant()
val compactionBaseFile = metadataTableFSView.getAllBaseFiles("secondary_index_idx_not_record_key_col")
.filter(JavaConversions.getPredicate((f: HoodieBaseFile) => f.getCommitTime.equals(lastCompactionInstant.get().getTimestamp)))
.findAny()
// TODO: fix secondary index compaction
// assertTrue(compactionBaseFile.isPresent)
} finally {
metadataTableFSView.close()
}

// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from hudi_metadata('$basePath') where type=7")(
Seq("abc", "row1"),
Seq("cde", "row2"),
Seq("def", "row3")
)
// validate data skipping with filters on secondary key column
spark.sql("set hoodie.metadata.enable=true")
spark.sql("set hoodie.enable.data.skipping=true")
spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where not_record_key_col = 'abc'")(
Seq(1, "row1", "abc", "p1")
)
verifyQueryPredicate(hudiOpts, "not_record_key_col")
}
}

private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = {
assertResult(expects.map(row => Row(row: _*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString()))
}
Expand Down

0 comments on commit 3a488a4

Please sign in to comment.