From bc822dcbcc2a529e2bdec89ad6c4a76d2a118142 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Mon, 30 Sep 2024 21:59:02 +0530 Subject: [PATCH 1/4] [HUDI-8025] Add tests for index updates with table services --- .../functional/TestPartitionStatsIndex.scala | 130 +++++++++++- .../TestSecondaryIndexPruning.scala | 187 +++++++++++++++++- 2 files changed, 310 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala index 1dddeb3ffddb..a516a4665f91 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala @@ -22,14 +22,16 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy import org.apache.hudi.client.transaction.lock.InProcessLockProvider -import org.apache.hudi.common.model.{FileSlice, HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode} +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieCommitMetadata, HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode, WriteOperationType} import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings -import org.apache.hudi.config.{HoodieCleanConfig, HoodieLockConfig, HoodieWriteConfig} +import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieWriteConflictException import org.apache.hudi.keygen.constant.KeyGeneratorOptions -import org.apache.hudi.metadata.HoodieMetadataFileSystemView -import org.apache.hudi.util.JFunction +import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieMetadataFileSystemView, MetadataPartitionType} +import org.apache.hudi.util.{JFunction, JavaConversions} import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} @@ -224,6 +226,126 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { assertEquals(67, snapshot0.count()) } + /** + * Test case to do updates and then validate partition stats with cleaning. + */ + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testPartitionStatsWithCompactionAndCleaning(tableType: HoodieTableType): Unit = { + var hudiOpts = commonOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), + HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1") + if (tableType == HoodieTableType.MERGE_ON_READ) { + hudiOpts = hudiOpts ++ Map( + HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2", + HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0", + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "15" + ) + } + // insert followed by two upserts (trigger a compaction so that prev version can be cleaned) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + // Clean Operation + val lastCleanInstant = getLatestMetaClient(false).getActiveTimeline.getCleanerTimeline.lastInstant() + assertTrue(lastCleanInstant.isPresent) + // do another upsert and validate the partition stats including file pruning + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + + validateDataAndPartitionStats() + createTempTable(hudiOpts) + verifyQueryPredicate(hudiOpts) + } + + /** + * Test case to do updates and then validate partition stats with clustering. + */ + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testPartitionStatsWithClustering(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), + HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true", + HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2", + KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key() -> "true") + + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + // validate clustering instant + val lastClusteringInstant = getLatestClusteringInstant + assertTrue(getLatestClusteringInstant.isPresent) + // do two more rounds of upsert to trigger another clustering + doWriteAndValidateDataAndPartitionStats(hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + doWriteAndValidateDataAndPartitionStats(hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + assertTrue(getLatestClusteringInstant.get().getTimestamp.compareTo(lastClusteringInstant.get().getTimestamp) > 0) + assertEquals(getLatestClusteringInstant, metaClient.getActiveTimeline.lastInstant()) + // We are validating rollback of a DT clustering instant here + rollbackLastInstant(hudiOpts) + + validateDataAndPartitionStats() + createTempTable(hudiOpts) + verifyQueryPredicate(hudiOpts) + } + + /** + * Test case to do updates and then validate partition stats with MDT compaction. + * Any one table type is enough to test this as we are validating the metadata table. + */ + @Test + def testPartitionStatsWithMDTCompaction(): Unit = { + val hudiOpts = commonOpts ++ Map( + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) + // validate MDT compaction instant + val metadataTableFSView = getHoodieTable(metaClient, getWriteConfig(hudiOpts)).getMetadataTable + .asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView + try { + val compactionTimeline = metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants() + val lastCompactionInstant = compactionTimeline + .filter(JavaConversions.getPredicate((instant: HoodieInstant) => + HoodieCommitMetadata.fromBytes(compactionTimeline.getInstantDetails(instant).get, classOf[HoodieCommitMetadata]) + .getOperationType == WriteOperationType.COMPACT)) + .lastInstant() + val compactionBaseFile = metadataTableFSView.getAllBaseFiles(MetadataPartitionType.PARTITION_STATS.getPartitionPath) + .filter(JavaConversions.getPredicate((f: HoodieBaseFile) => f.getCommitTime.equals(lastCompactionInstant.get().getTimestamp))) + .findAny() + assertTrue(compactionBaseFile.isPresent) + } finally { + metadataTableFSView.close() + } + } + def verifyQueryPredicate(hudiOpts: Map[String, String]): Unit = { val reckey = mergedDfList.last.limit(1).collect().map(row => row.getAs("_row_key").toString) val dataFilter = EqualTo(attribute("_row_key"), Literal(reckey(0))) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index 6131ce152212..42ff27bed93f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -19,13 +19,14 @@ package org.apache.hudi.functional -import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy import org.apache.hudi.client.transaction.lock.InProcessLockProvider import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} -import org.apache.hudi.common.model.{FileSlice, HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode} +import org.apache.hudi.common.model._ import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieWriteConflictException @@ -33,7 +34,7 @@ import org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCa import org.apache.hudi.metadata.{HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView, SparkHoodieBackedTableMetadataWriter} import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf -import org.apache.hudi.util.JFunction +import org.apache.hudi.util.{JFunction, JavaConversions} import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex, HoodieSparkUtils} import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal} @@ -486,6 +487,186 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { } } + /** + * Test case to write with updates and validate secondary index with multiple writers. + */ + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + def testSecondaryIndexPruningWithCleaning(tableType: HoodieTableType): Unit = { + if (HoodieSparkUtils.gteqSpark3_3) { + var hudiOpts = commonOpts + hudiOpts = hudiOpts + ( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1") + if (tableType == HoodieTableType.MERGE_ON_READ) { + hudiOpts = hudiOpts ++ Map( + HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2", + HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0", + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "15" + ) + } + val sqlTableType = if (tableType == HoodieTableType.COPY_ON_WRITE) "cow" else "mor" + tableName += "test_secondary_index_pruning_compact_clean_" + sqlTableType + + spark.sql( + s""" + |create table $tableName ( + | ts bigint, + | record_key_col string, + | not_record_key_col string, + | partition_key_col string + |) using hudi + | options ( + | primaryKey ='record_key_col', + | type = '$sqlTableType', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'record_key_col', + | hoodie.enable.data.skipping = 'true', + | hoodie.clean.policy = 'KEEP_LATEST_COMMITS', + | ${HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key} = '1' + | ) + | partitioned by(partition_key_col) + | location '$basePath' + """.stripMargin) + // by setting small file limit to 0, each insert will create a new file + // need to generate more file for non-partitioned table to test data skipping + // as the partitioned table will have only one file per partition + spark.sql("set hoodie.parquet.small.file.limit=0") + if (tableType == HoodieTableType.MERGE_ON_READ) { + spark.sql("set hoodie.compact.inline=true") + spark.sql("set hoodie.compact.inline.max.delta.commits=2") + spark.sql("set hoodie.metadata.compact.num.delta.commits=15") + } + spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')") + spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')") + spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')") + // create secondary index + spark.sql(s"create index idx_not_record_key_col on $tableName using secondary_index(not_record_key_col)") + // validate index created successfully + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(HoodieTestUtils.getDefaultStorageConf) + .build() + assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col")) + // validate the secondary index records themselves + checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from hudi_metadata('$basePath') where type=7")( + Seq("abc", "row1"), + Seq("cde", "row2"), + Seq("def", "row3") + ) + // validate data skipping with filters on secondary key column + spark.sql("set hoodie.metadata.enable=true") + spark.sql("set hoodie.enable.data.skipping=true") + spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict") + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where not_record_key_col = 'abc'")( + Seq(1, "row1", "abc", "p1") + ) + verifyQueryPredicate(hudiOpts, "not_record_key_col") + + // update the secondary key column + spark.sql(s"update $tableName set not_record_key_col = 'xyz' where record_key_col = 'row1'") + // validate the secondary index records themselves + checkAnswer(s"select key, SecondaryIndexMetadata.recordKey, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( + Seq("abc", "row1", true), + Seq("cde", "row2", false), + Seq("def", "row3", false), + Seq("xyz", "row1", false) + ) + // validate data and data skipping + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where record_key_col = 'row1'")( + Seq(1, "row1", "xyz", "p1") + ) + verifyQueryPredicate(hudiOpts, "not_record_key_col") + } + } + + /** + * Test case to write with updates and validate secondary index with multiple writers. + * Any one table type is enough to test this as we are validating the metadata table. + */ + @Test + def testSecondaryIndexWithMDTCompaction(): Unit = { + if (HoodieSparkUtils.gteqSpark3_3) { + val hudiOpts = commonOpts ++ Map( + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) + val tableName = "test_secondary_index_with_mdt_compaction" + + spark.sql( + s""" + |create table $tableName ( + | ts bigint, + | record_key_col string, + | not_record_key_col string, + | partition_key_col string + |) using hudi + | options ( + | primaryKey ='record_key_col', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'record_key_col', + | hoodie.enable.data.skipping = 'true', + | hoodie.clean.policy = 'KEEP_LATEST_COMMITS', + | ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '1' + | ) + | partitioned by(partition_key_col) + | location '$basePath' + """.stripMargin) + // by setting small file limit to 0, each insert will create a new file + // need to generate more file for non-partitioned table to test data skipping + // as the partitioned table will have only one file per partition + spark.sql("set hoodie.parquet.small.file.limit=0") + spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')") + spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')") + // create secondary index + spark.sql(s"create index idx_not_record_key_col on $tableName using secondary_index(not_record_key_col)") + // validate index created successfully + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(HoodieTestUtils.getDefaultStorageConf) + .build() + assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col")) + + // do another insert and validate compaction in metadata table + spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')") + val metadataTableFSView: HoodieMetadataFileSystemView = getTableFileSystemView(hudiOpts) + try { + val compactionTimeline = metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants() + val lastCompactionInstant = compactionTimeline + .filter(JavaConversions.getPredicate((instant: HoodieInstant) => + HoodieCommitMetadata.fromBytes(compactionTimeline.getInstantDetails(instant).get, classOf[HoodieCommitMetadata]) + .getOperationType == WriteOperationType.COMPACT)) + .lastInstant() + val compactionBaseFile = metadataTableFSView.getAllBaseFiles("secondary_index_idx_not_record_key_col") + .filter(JavaConversions.getPredicate((f: HoodieBaseFile) => f.getCommitTime.equals(lastCompactionInstant.get().getTimestamp))) + .findAny() + // TODO: fix secondary index compaction + // assertTrue(compactionBaseFile.isPresent) + } finally { + metadataTableFSView.close() + } + + // validate the secondary index records themselves + checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from hudi_metadata('$basePath') where type=7")( + Seq("abc", "row1"), + Seq("cde", "row2"), + Seq("def", "row3") + ) + // validate data skipping with filters on secondary key column + spark.sql("set hoodie.metadata.enable=true") + spark.sql("set hoodie.enable.data.skipping=true") + spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict") + checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where not_record_key_col = 'abc'")( + Seq(1, "row1", "abc", "p1") + ) + verifyQueryPredicate(hudiOpts, "not_record_key_col") + } + } + private def checkAnswer(query: String)(expects: Seq[Any]*): Unit = { assertResult(expects.map(row => Row(row: _*)).toArray.sortBy(_.toString()))(spark.sql(query).collect().sortBy(_.toString())) } From 348a260f2341012f45006478d189ee8b2f747b82 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 3 Oct 2024 20:33:57 +0530 Subject: [PATCH 2/4] address comments --- .../functional/TestPartitionStatsIndex.scala | 15 ++++++-- .../TestSecondaryIndexPruning.scala | 37 +++++++++---------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala index a516a4665f91..cba98f4a8342 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala @@ -239,8 +239,7 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { hudiOpts = hudiOpts ++ Map( HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2", - HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0", - HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "15" + HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0" ) } // insert followed by two upserts (trigger a compaction so that prev version can be cleaned) @@ -259,6 +258,12 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { // Clean Operation val lastCleanInstant = getLatestMetaClient(false).getActiveTimeline.getCleanerTimeline.lastInstant() assertTrue(lastCleanInstant.isPresent) + // validation that the compaction commit is present in case of MOR table + if (tableType == HoodieTableType.MERGE_ON_READ) { + val lastCompactionInstant = getLatestMetaClient(false).getActiveTimeline.getCommitTimeline.filterCompletedInstants().lastInstant() + assertTrue(lastCompactionInstant.isPresent) + } + // do another upsert and validate the partition stats including file pruning doWriteAndValidateDataAndPartitionStats( hudiOpts, @@ -317,7 +322,7 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { @Test def testPartitionStatsWithMDTCompaction(): Unit = { val hudiOpts = commonOpts ++ Map( - HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "1" + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "2" ) doWriteAndValidateDataAndPartitionStats( hudiOpts, @@ -327,6 +332,10 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { hudiOpts, operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, saveMode = SaveMode.Append) + doWriteAndValidateDataAndPartitionStats( + hudiOpts, + operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Append) // validate MDT compaction instant val metadataTableFSView = getHoodieTable(metaClient, getWriteConfig(hudiOpts)).getMetadataTable .asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index 42ff27bed93f..5d03694b8bba 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -19,19 +19,20 @@ package org.apache.hudi.functional -import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy import org.apache.hudi.client.transaction.lock.InProcessLockProvider import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} -import org.apache.hudi.common.model._ +import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieCommitMetadata, HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode, WriteOperationType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieWriteConflictException import org.apache.hudi.functional.TestSecondaryIndexPruning.SecondaryIndexTestCase -import org.apache.hudi.metadata.{HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView, SparkHoodieBackedTableMetadataWriter} +import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieBackedTableMetadataWriter, HoodieMetadataFileSystemView, SparkHoodieBackedTableMetadataWriter} +import org.apache.hudi.table.HoodieSparkTable import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf import org.apache.hudi.util.{JFunction, JavaConversions} @@ -390,9 +391,8 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { def testSecondaryIndexWithConcurrentWrites(tableType: HoodieTableType): Unit = { if (HoodieSparkUtils.gteqSpark3_3) { val tableName = "hudi_multi_writer_table_" + tableType.name() - - // Common Hudi options - val hudiOpts = commonOpts ++ Map( + var hudiOpts = commonOpts + hudiOpts = hudiOpts + ( DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), HoodieWriteConfig.TBL_NAME.key -> tableName, HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() -> WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name, @@ -492,19 +492,17 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { */ @ParameterizedTest @EnumSource(classOf[HoodieTableType]) - def testSecondaryIndexPruningWithCleaning(tableType: HoodieTableType): Unit = { + def testSecondaryIndexWithCompactionAndCleaning(tableType: HoodieTableType): Unit = { if (HoodieSparkUtils.gteqSpark3_3) { var hudiOpts = commonOpts hudiOpts = hudiOpts + ( DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), - DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1") if (tableType == HoodieTableType.MERGE_ON_READ) { - hudiOpts = hudiOpts ++ Map( + hudiOpts = hudiOpts + ( HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2", - HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0", - HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "15" + HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0" ) } val sqlTableType = if (tableType == HoodieTableType.COPY_ON_WRITE) "cow" else "mor" @@ -590,10 +588,10 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { @Test def testSecondaryIndexWithMDTCompaction(): Unit = { if (HoodieSparkUtils.gteqSpark3_3) { - val hudiOpts = commonOpts ++ Map( - DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true", - HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "1" - ) + var hudiOpts = commonOpts + hudiOpts = hudiOpts + ( + DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL, + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "2") val tableName = "test_secondary_index_with_mdt_compaction" spark.sql( @@ -606,12 +604,13 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { |) using hudi | options ( | primaryKey ='record_key_col', + | type = 'mor', | hoodie.metadata.enable = 'true', | hoodie.metadata.record.index.enable = 'true', | hoodie.datasource.write.recordkey.field = 'record_key_col', | hoodie.enable.data.skipping = 'true', | hoodie.clean.policy = 'KEEP_LATEST_COMMITS', - | ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '1' + | ${HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key} = '2' | ) | partitioned by(partition_key_col) | location '$basePath' @@ -633,7 +632,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { // do another insert and validate compaction in metadata table spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')") - val metadataTableFSView: HoodieMetadataFileSystemView = getTableFileSystemView(hudiOpts) + val metadataTableFSView = HoodieSparkTable.create(getWriteConfig(hudiOpts), context()).getMetadataTable.asInstanceOf[HoodieBackedTableMetadata].getMetadataFileSystemView try { val compactionTimeline = metadataTableFSView.getVisibleCommitsAndCompactionTimeline.filterCompletedAndCompactionInstants() val lastCompactionInstant = compactionTimeline @@ -644,8 +643,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { val compactionBaseFile = metadataTableFSView.getAllBaseFiles("secondary_index_idx_not_record_key_col") .filter(JavaConversions.getPredicate((f: HoodieBaseFile) => f.getCommitTime.equals(lastCompactionInstant.get().getTimestamp))) .findAny() - // TODO: fix secondary index compaction - // assertTrue(compactionBaseFile.isPresent) + assertTrue(compactionBaseFile.isPresent) } finally { metadataTableFSView.close() } @@ -663,7 +661,6 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { checkAnswer(s"select ts, record_key_col, not_record_key_col, partition_key_col from $tableName where not_record_key_col = 'abc'")( Seq(1, "row1", "abc", "p1") ) - verifyQueryPredicate(hudiOpts, "not_record_key_col") } } From 15b0f7637d82ad026ecacb4f7a71653f5fb364df Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 3 Oct 2024 22:15:28 +0530 Subject: [PATCH 3/4] fix for spark3.5 --- .../hudi/functional/TestSecondaryIndexPruning.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index 5d03694b8bba..cd2d6c125693 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -391,8 +391,9 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { def testSecondaryIndexWithConcurrentWrites(tableType: HoodieTableType): Unit = { if (HoodieSparkUtils.gteqSpark3_3) { val tableName = "hudi_multi_writer_table_" + tableType.name() - var hudiOpts = commonOpts - hudiOpts = hudiOpts + ( + + // Common Hudi options + val hudiOpts = commonOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), HoodieWriteConfig.TBL_NAME.key -> tableName, HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() -> WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name, @@ -495,7 +496,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { def testSecondaryIndexWithCompactionAndCleaning(tableType: HoodieTableType): Unit = { if (HoodieSparkUtils.gteqSpark3_3) { var hudiOpts = commonOpts - hudiOpts = hudiOpts + ( + hudiOpts = hudiOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1") if (tableType == HoodieTableType.MERGE_ON_READ) { @@ -589,7 +590,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { def testSecondaryIndexWithMDTCompaction(): Unit = { if (HoodieSparkUtils.gteqSpark3_3) { var hudiOpts = commonOpts - hudiOpts = hudiOpts + ( + hudiOpts = hudiOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL, HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key() -> "2") val tableName = "test_secondary_index_with_mdt_compaction" From 6af9888c1d24163190f80f3fcbd5c401b7f3bfa7 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 4 Oct 2024 10:09:26 +0530 Subject: [PATCH 4/4] fix scala2.13 issue --- .../hudi/functional/TestSecondaryIndexPruning.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index cd2d6c125693..a334504837f8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -85,7 +85,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { val tableType = testCase.tableType val isPartitioned = testCase.isPartitioned var hudiOpts = commonOpts - hudiOpts = hudiOpts + ( + hudiOpts = hudiOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key -> tableType, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true") val sqlTableType = if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor" @@ -164,7 +164,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { def testCreateAndDropSecondaryIndex(): Unit = { if (HoodieSparkUtils.gteqSpark3_3) { var hudiOpts = commonOpts - hudiOpts = hudiOpts + ( + hudiOpts = hudiOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true") tableName += "test_secondary_index_create_drop_partitioned_mor" @@ -226,7 +226,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { val tableType = testCase.tableType val isPartitioned = testCase.isPartitioned var hudiOpts = commonOpts - hudiOpts = hudiOpts + ( + hudiOpts = hudiOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key -> tableType, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true") val sqlTableType = if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor" @@ -305,7 +305,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { val tableType = testCase.tableType val isPartitioned = testCase.isPartitioned var hudiOpts = commonOpts - hudiOpts = hudiOpts + ( + hudiOpts = hudiOpts ++ Map( DataSourceWriteOptions.TABLE_TYPE.key -> tableType, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true") val sqlTableType = if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor" @@ -500,7 +500,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "1") if (tableType == HoodieTableType.MERGE_ON_READ) { - hudiOpts = hudiOpts + ( + hudiOpts = hudiOpts ++ Map( HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0"