diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 135bd48c0366..ae572ce2354a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -320,7 +320,7 @@ private static String printAllCompactions(HoodieDefaultTimeline timeline, .filter(pair -> pair.getRight() != null) .collect(Collectors.toList()); - Set committedInstants = timeline.getCommitTimeline().filterCompletedInstants() + Set committedInstants = timeline.getCommitAndReplaceTimeline().filterCompletedInstants() .getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); List rows = new ArrayList<>(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java index 16429f653759..a6a3048615bd 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java @@ -247,7 +247,7 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m HoodieTimeline timeline; if (basefileOnly) { - timeline = metaClient.getActiveTimeline().getCommitTimeline(); + timeline = metaClient.getActiveTimeline().getCommitAndReplaceTimeline(); } else if (excludeCompaction) { timeline = metaClient.getActiveTimeline().getCommitsTimeline(); } else { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index d3c301430729..696740daae68 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -232,7 +232,7 @@ storage, new StoragePath(logFilePathPattern)).stream() .withReaderSchema(readerSchema) .withLatestInstantTime( client.getActiveTimeline() - .getCommitTimeline().lastInstant().get().getTimestamp()) + .getCommitAndReplaceTimeline().lastInstant().get().getTimestamp()) .withReverseReader( Boolean.parseBoolean( HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue())) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 683c6351e44e..0f37190eee6f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -118,7 +118,7 @@ public String addPartitionMeta( HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); String latestCommit = - client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); + client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().get().getTimestamp(); List partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.storage, client.getBasePath()); StoragePath basePath = client.getBasePathV2(); @@ -240,7 +240,7 @@ public String migratePartitionMeta( Option baseFormatFile = HoodiePartitionMetadata.baseFormatMetaPathIfExists(HoodieCLI.storage, partition); String latestCommit = - client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); + client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().get().getTimestamp(); String[] row = new String[] { partitionPath, diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java index f8e60ba8cee1..9f859bf72bfc 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java @@ -69,7 +69,7 @@ public String writeAmplificationStats( long totalRecordsWritten = 0; HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = activeTimeline.getCommitAndReplaceTimeline().filterCompletedInstants(); List rows = new ArrayList<>(); DecimalFormat df = new DecimalFormat("#.00"); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java index 21910fd956df..12322617fb2d 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java @@ -36,7 +36,7 @@ public class CommitUtil { public static long countNewRecords(HoodieTableMetaClient metaClient, List commitsToCatchup) throws IOException { long totalNew = 0; - HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); for (String commit : commitsToCatchup) { HoodieCommitMetadata c = HoodieCommitMetadata.fromBytes( timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get(), diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java index 6e831a996ba9..019d7d6a3828 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java @@ -207,7 +207,7 @@ private void testRefreshCommand(String command) throws IOException { assertTrue(prepareTable()); HoodieTimeline timeline = - HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); assertEquals(0, timeline.countInstants(), "There should have no instant at first"); // generate four savepoints @@ -218,14 +218,14 @@ private void testRefreshCommand(String command) throws IOException { // Before refresh, no instant timeline = - HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); assertEquals(0, timeline.countInstants(), "there should have no instant"); Object result = shell.evaluate(() -> command); assertTrue(ShellEvaluationResultUtil.isSuccess(result)); timeline = - HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); // After refresh, there are 4 instants assertEquals(4, timeline.countInstants(), "there should have 4 instants"); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java index 8f1d07b4eb56..ced1cf7a3ef0 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java @@ -137,7 +137,7 @@ public void testRollbackToSavepoint() throws IOException { assertEquals(1, timeline.getRestoreTimeline().countInstants()); // 103 instant had rollback - assertFalse(timeline.getCommitTimeline().containsInstant( + assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant( new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103"))); } @@ -182,9 +182,9 @@ public void testRollbackToSavepointWithMetadataTableEnable() throws Exception { assertEquals(1, timeline.getRestoreTimeline().countInstants()); // 103 and 104 instant had rollback - assertFalse(timeline.getCommitTimeline().containsInstant( + assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant( new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103"))); - assertFalse(timeline.getCommitTimeline().containsInstant( + assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant( new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "104"))); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java index 99b5d833f509..6023b17ce0d2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java @@ -143,7 +143,7 @@ public static Option loadMetadata(HoodieTable t && maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()))) { return loadMetadataFromGivenFile(table, maxMetadataFile); } - HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); // fix the in-consistency between un-committed and committed hashing metadata files. List fixed = new ArrayList<>(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 4fc3271e8a05..f77efda01a59 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -1357,7 +1357,7 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient) { protected void cleanIfNecessary(BaseHoodieWriteClient writeClient) { Option lastCompletedCompactionInstant = metadataMetaClient.reloadActiveTimeline() - .getCommitTimeline().filterCompletedInstants().lastInstant(); + .getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant(); if (lastCompletedCompactionInstant.isPresent() && metadataMetaClient.getActiveTimeline().filterCompletedInstants() .findInstantsAfter(lastCompletedCompactionInstant.get().getTimestamp()).countInstants() < 3) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java index 8703ffb9de0c..7084ae013e4f 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java @@ -132,7 +132,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) // for new inserts, compute buckets depending on how many records we have for each partition Set partitionPaths = profile.getPartitionPaths(); long averageRecordSize = - averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(), config); LOG.info("AvgRecordSize => " + averageRecordSize); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 7603e776b92e..ae17a34da3e7 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -1729,7 +1729,7 @@ public void testMetadataMultiWriter() throws Exception { assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004"))); // Compaction may occur if the commits completed in order - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClients[0]); @@ -1776,7 +1776,7 @@ public void testMultiWriterForDoubleLocking() throws Exception { // 6 commits and 2 cleaner commits. assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8); - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClient); } @@ -2049,7 +2049,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // There should not be any compaction yet and we have not performed more than maxDeltaCommitsBeforeCompaction // deltacommits (1 will be due to bootstrap) HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 0); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 0); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction - 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2059,7 +2059,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { client.startCommitWithTime(newCommitTime); client.insert(records, newCommitTime); metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction + 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2080,7 +2080,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // Ensure no more compactions took place due to the leftover inflight commit metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction /* clean from dataset */) + 1)/* clean in metadata table */); @@ -2095,7 +2095,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // Ensure compactions took place metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 2); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 2); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction + 1 /* clean from dataset */) + 2 /* clean in metadata table */)); assertTrue(datasetMetaClient.getArchivedTimeline().reload().countInstants() > 0); @@ -2445,7 +2445,7 @@ public void testRepeatedActionWithSameInstantTime() throws Exception { client.upsert(records, newCommitTime); } } - assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(), 3); + assertEquals(metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 3); try (HoodieJavaWriteClient client = new HoodieJavaWriteClient(engineContext, writeConfig)) { // Perform a clean diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java index 8e9cbce0c92d..233b1592a6bb 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java @@ -517,7 +517,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, 0, 150); HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false); - List instants = activeTimeline.getCommitTimeline().getInstants(); + List instants = activeTimeline.getCommitAndReplaceTimeline().getInstants(); assertEquals(5, instants.size()); assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), instants.get(0)); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java index 61429b3fef2e..871d9ffad7de 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java @@ -867,7 +867,7 @@ private List getWriteStatusAndVerifyDeleteOperation(String newCommi // verify that there is a commit HoodieTableMetaClient metaClient = createMetaClient(); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); if (assertForCommit) { assertEquals(3, timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 007097a0a6cd..230f684d165e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -135,7 +135,7 @@ public void readLocalWriteHDFS() throws Exception { // Read from hdfs FileSystem fs = HadoopFSUtils.getFs(dfsBasePath, HoodieTestUtils.getDefaultStorageConf()); HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(HadoopFSUtils.getStorageConf(fs.getConf()), dfsBasePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); Dataset readRecords = HoodieClientTestUtils.readCommit(dfsBasePath, sqlContext, timeline, readCommitTime); assertEquals(readRecords.count(), records.size()); @@ -156,7 +156,7 @@ public void readLocalWriteHDFS() throws Exception { LOG.info("Reading from path: " + tablePath); fs = HadoopFSUtils.getFs(tablePath, HoodieTestUtils.getDefaultStorageConf()); metaClient = HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(fs.getConf()), tablePath); - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); Dataset localReadRecords = HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, writeCommitTime); assertEquals(localReadRecords.count(), localRecords.size()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index aeb0627744ef..9ed2dce3ce54 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -309,7 +309,7 @@ public void testCopyOnWriteTable(boolean shouldAllowDroppedColumns) throws Excep (String s, Integer a) -> evolvedRecords, SparkRDDWriteClient::insert, true, numRecords, 3 * numRecords, 6, false); // new commit - HoodieTimeline curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline curTimeline = metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("006")); checkReadRecords("000", 3 * numRecords); @@ -333,7 +333,7 @@ public void testCopyOnWriteTable(boolean shouldAllowDroppedColumns) throws Excep private void checkReadRecords(String instantTime, int numExpectedRecords) throws IOException { if (tableType == HoodieTableType.COPY_ON_WRITE) { - HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline(); + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline(); assertEquals(numExpectedRecords, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of(instantTime))); } else { // TODO: This code fails to read records under the following conditions: diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 97ed59fef416..700b9f1cd248 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -2217,7 +2217,7 @@ public void testMetadataMultiWriter() throws Exception { assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004"))); // Compaction may occur if the commits completed in order - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClients[0]); @@ -2265,7 +2265,7 @@ public void testMultiWriterForDoubleLocking() throws Exception { // 6 commits and 2 cleaner commits. assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8); - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClient); } @@ -2531,7 +2531,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // There should not be any compaction yet and we have not performed more than maxDeltaCommitsBeforeCompaction // deltacommits (1 will be due to bootstrap) HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 0); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 0); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction - 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2541,7 +2541,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { client.startCommitWithTime(newCommitTime); client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction + 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2562,7 +2562,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // Ensure no more compactions took place due to the leftover inflight commit metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction /* clean from dataset */) + 1)/* clean in metadata table */); @@ -2577,7 +2577,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // Ensure compactions took place metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 2); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 2); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction + 1 /* clean from dataset */) + 2 /* clean in metadata table */)); assertTrue(datasetMetaClient.getArchivedTimeline().reload().countInstants() > 0); @@ -3210,7 +3210,7 @@ public void testRepeatedActionWithSameInstantTime() throws Exception { client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); } } - assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(), 3); + assertEquals(metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 3); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { // Perform a clean diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index a40a3c4eaead..ed5fed859464 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -683,7 +683,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, 0, 150); HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false); - List instants = activeTimeline.getCommitTimeline().getInstants(); + List instants = activeTimeline.getCommitAndReplaceTimeline().getInstants(); assertEquals(5, instants.size()); assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), instants.get(0)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index c451f4bd938e..ad612ee5c9b9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -121,7 +121,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap // verify that there is a commit metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit."); assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), "Latest commit should be 001"); @@ -147,7 +147,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap // verify that there are 2 commits metaClient = HoodieTableMetaClient.reload(metaClient); - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(2, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting two commits."); assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), "Latest commit should be 002"); Dataset dataSet = getRecords(); @@ -167,7 +167,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap // verify that there are now 3 commits metaClient = HoodieTableMetaClient.reload(metaClient); - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(3, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting three commits."); assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), "Latest commit should be 003"); dataSet = getRecords(); @@ -197,7 +197,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap assertNoWriteErrors(statuses); // verify there are now 4 commits - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(4, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting four commits."); assertEquals(timeline.lastInstant().get().getTimestamp(), newCommitTime, "Latest commit should be 004"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 4f76764ce980..5f85f4c9db30 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -612,7 +612,7 @@ public void testReadArchivedCompactionPlan() throws Exception { HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline(); // load instant details archivedTimeLine.loadCompactionDetailsInMemory("00000001", "00000011"); - List compactionInstants = archivedTimeLine.getCommitTimeline().getInstants(); + List compactionInstants = archivedTimeLine.getCommitAndReplaceTimeline().getInstants(); assertEquals(2, compactionInstants.size(), "Two compactions instants should be archived."); List> planDetailsList = compactionInstants.stream().map(archivedTimeLine::getInstantDetails).collect(Collectors.toList()); assertTrue(planDetailsList.stream().allMatch(Option::isPresent), "All the compaction instants should have plan details."); @@ -1563,7 +1563,7 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception { // delta commits 11 till 12 are added later on without archival or compaction // mdt timeline [7, 8, 9, 10, a completed compaction commit] for i = 10 assertEquals(i - 5, metadataTableInstants.size()); - assertEquals(1, metadataTableMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants()); + assertEquals(1, metadataTableMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants()); IntStream.range(7, i).forEach(j -> assertTrue(metadataTableInstants.contains( new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instants.get(j - 1))))); @@ -1573,7 +1573,7 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception { // till 17 are added later on without archival or compaction // mdt timeline: [10, a completed compaction commit, 11, ... 14, 15, ... 17] assertEquals(i - 8, metadataTableInstants.size()); - assertEquals(1, metadataTableMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants()); + assertEquals(1, metadataTableMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants()); IntStream.range(10, i).forEach(j -> assertTrue(metadataTableInstants.contains( new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instants.get(j - 1))))); @@ -1582,7 +1582,7 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception { // commits in MDT [10, a completed compaction commit, 11, ... 17, 18, a completed compaction commit] // another compaction is triggered by this commit so everything upto 18 is compacted. assertEquals(11, metadataTableInstants.size()); - assertEquals(2, metadataTableMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants()); + assertEquals(2, metadataTableMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants()); IntStream.range(10, i).forEach(j -> assertTrue(metadataTableInstants.contains( new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instants.get(j - 1))))); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 70c128f907ee..a7f022ad0eb9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -152,7 +152,7 @@ public static Pair> insertFirstBigBatchForClientCle assertNoWriteErrors(statuses.collect()); // verify that there is a commit metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit."); // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 070d4d0d325f..b0876d061037 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -162,7 +162,7 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { assertTrue(deltaCommit.isPresent()); assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List allFiles = listAllBaseFilesInPath(hoodieTable); @@ -196,7 +196,7 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { assertTrue(deltaCommit.isPresent()); assertEquals("002", deltaCommit.get().getTimestamp(), "Latest Delta commit should be 002"); - commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); allFiles = listAllBaseFilesInPath(hoodieTable); @@ -652,7 +652,7 @@ public void testHandleUpdateWithMultiplePartitions() throws Exception { assertTrue(deltaCommit.isPresent()); assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List allFiles = listAllBaseFilesInPath(hoodieTable); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 4f1c0a998059..cc29bbaf1baf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -270,7 +270,7 @@ public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception { // Then: 1 delta commit is done, the failed compaction is retried metaClient = createMetaClient(cfg.getBasePath()); assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); - assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } @Test @@ -308,7 +308,7 @@ public void testCompactionRetryOnFailureBasedOnTime() throws Exception { metaClient = createMetaClient(cfg.getBasePath()); // 2 delta commits at the beginning. 1 compaction, 1 delta commit following it. assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); - assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + assertEquals(instantTime, metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } @Test @@ -345,6 +345,6 @@ public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception { // Then: 1 delta commit is done, the failed compaction is retried metaClient = createMetaClient(cfg.getBasePath()); assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); - assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + assertEquals(instantTime, metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 67d618e616dd..4fd43a220577 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -289,7 +289,7 @@ private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfi //2. rollback HoodieInstant commitInstant; if (isUsingMarkers) { - commitInstant = table.getActiveTimeline().getCommitTimeline().filterInflights().lastInstant().get(); + commitInstant = table.getActiveTimeline().getCommitAndReplaceTimeline().filterInflights().lastInstant().get(); } else { commitInstant = table.getCompletedCommitTimeline().lastInstant().get(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index 220ee9b87e7a..36dd49fa9841 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -291,7 +291,7 @@ public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws E assertTrue(deltaCommit.isPresent()); assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List allFiles = listAllBaseFilesInPath(hoodieTable); @@ -334,7 +334,7 @@ public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws E assertTrue(deltaCommit.isPresent()); assertEquals("004", deltaCommit.get().getTimestamp(), "Latest Delta commit should be 004"); - commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); allFiles = listAllBaseFilesInPath(hoodieTable); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 275d62bc4f3e..cb3d35081580 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -113,7 +113,7 @@ void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws Exc client.commit(newCommitTime, jsc().parallelize(statuses)); metaClient = HoodieTableMetaClient.reload(metaClient); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertTrue(commit.isPresent()); assertEquals("001", commit.get().getTimestamp(), "commit should be 001"); @@ -189,7 +189,7 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro assertTrue(deltaCommit.isPresent()); assertEquals("000000001", deltaCommit.get().getTimestamp(), "Delta commit should be 000000001"); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List allFiles = listAllBaseFilesInPath(hoodieTable); @@ -377,7 +377,7 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { assertEquals(200, getTotalRecordsWritten(instantCommitMetadataPairOpt.get().getValue())); Option commit = - metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 2c4715174d45..d52caf0ddce4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -530,7 +530,7 @@ private JavaRDD getWriteStatusAndVerifyDeleteOperation(String newCo // verify that there is a commit HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(storageConf, basePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); if (assertForCommit) { assertEquals(3, timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index e45578211cbe..79dda856367b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -289,7 +289,7 @@ protected Stream insertRecordsToMORTable(HoodieTableMetaClient m "Delta commit should be specified value"); Option commit = - reloadedMetaClient.getActiveTimeline().getCommitTimeline().lastInstant(); + reloadedMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant(); assertFalse(commit.isPresent()); List allFiles = listAllBaseFilesInPath(hoodieTable); @@ -337,7 +337,7 @@ protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List commit = - reloadedMetaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + reloadedMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index d835966e7517..62a79f2b0805 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -665,7 +665,7 @@ public boolean isTimelineNonEmpty() { public HoodieTimeline getCommitsTimeline() { switch (this.getTableType()) { case COPY_ON_WRITE: - return getActiveTimeline().getCommitTimeline(); + return getActiveTimeline().getCommitAndReplaceTimeline(); case MERGE_ON_READ: // We need to include the parquet files written out in delta commits // Include commit action to be able to start doing a MOR over a COW table - no @@ -685,7 +685,7 @@ public HoodieTimeline getCommitsTimeline() { public HoodieTimeline getCommitsAndCompactionTimeline() { switch (this.getTableType()) { case COPY_ON_WRITE: - return getActiveTimeline().getCommitTimeline(); + return getActiveTimeline().getCommitAndReplaceTimeline(); case MERGE_ON_READ: return getActiveTimeline().getWriteTimeline(); default: @@ -701,7 +701,7 @@ public HoodieTimeline getCommitTimeline() { case COPY_ON_WRITE: case MERGE_ON_READ: // We need to include the parquet files written out in delta commits in tagging - return getActiveTimeline().getCommitTimeline(); + return getActiveTimeline().getCommitAndReplaceTimeline(); default: throw new HoodieException("Unsupported table type :" + this.getTableType()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 71d63f16952f..a497eb17d86b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -330,13 +330,20 @@ public HoodieTimeline getAllCommitsTimeline() { } /** - * Get only pure commits (inflight and completed) in the active timeline. + * Get only pure commit and replace commits (inflight and completed) in the active timeline. */ - public HoodieTimeline getCommitTimeline() { + public HoodieTimeline getCommitAndReplaceTimeline() { //TODO: Make sure this change does not break existing functionality. return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, REPLACE_COMMIT_ACTION)); } + /** + * Get only pure commits (inflight and completed) in the active timeline. + */ + public HoodieTimeline getCommitTimeline() { + return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)); + } + /** * Get only the delta commits (inflight and completed) in the active timeline. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java index 6b0ab50a3783..321fdd0fce5f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java @@ -285,8 +285,7 @@ public static List getPendingCompactionInstantTimes(HoodieTableMe */ public static Option> getDeltaCommitsSinceLatestCompaction( HoodieActiveTimeline activeTimeline) { - Option lastCompaction = activeTimeline.getCommitTimeline() - .filterCompletedInstants().lastInstant(); + Option lastCompaction = activeTimeline.getCommitTimeline().filterCompletedInstants().lastInstant(); HoodieTimeline deltaCommits = activeTimeline.getDeltaCommitTimeline(); final HoodieInstant latestInstant; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index d41610f82d62..59605679c5d4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -753,7 +753,7 @@ public Option getSyncedInstantTime() { @Override public Option getLatestCompactionTime() { if (metadataMetaClient != null) { - Option latestCompaction = metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); + Option latestCompaction = metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant(); if (latestCompaction.isPresent()) { return Option.of(latestCompaction.get().getTimestamp()); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java index 9f3760b70d94..0ab72016893f 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java @@ -86,7 +86,7 @@ public void checkSerDe() { @Test public void checkCommitTimeline() { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty(), "Should be empty commit timeline"); HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "1"); @@ -95,12 +95,12 @@ public void checkCommitTimeline() { // Commit timeline should not auto-reload every time getActiveCommitTimeline(), it should be cached activeTimeline = metaClient.getActiveTimeline(); - activeCommitTimeline = activeTimeline.getCommitTimeline(); + activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty(), "Should be empty commit timeline"); activeTimeline = activeTimeline.reload(); HoodieInstant completedInstant = activeTimeline.getCommitsTimeline().getInstantsAsStream().findFirst().get(); - activeCommitTimeline = activeTimeline.getCommitTimeline(); + activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertFalse(activeCommitTimeline.empty(), "Should be the 1 commit we made"); assertTrue(completedInstant.isCompleted()); assertTrue(completedInstant.getTimestamp().equals(instant.getTimestamp())); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index d258753c3a85..83cf0957d7fd 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -107,7 +107,7 @@ public void tearDown() throws Exception { @Test public void testGetPartitionsWithReplaceCommits() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String ts1 = "1"; @@ -146,7 +146,7 @@ public void testGetPartitionsWithReplaceCommits() throws IOException { @Test public void testGetPartitions() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String olderPartition = "0"; // older partitions that is modified by all cleans @@ -185,7 +185,7 @@ public void testGetPartitions() throws IOException { @Test public void testGetPartitionsUnPartitioned() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String partitionPath = ""; @@ -213,7 +213,7 @@ public void testGetPartitionsUnPartitioned() throws IOException { @Test public void testRestoreInstants() throws Exception { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); for (int i = 1; i <= 5; i++) { @@ -238,7 +238,7 @@ public void testGetExtraMetadata() throws Exception { String extraMetadataKey = "test_key"; String extraMetadataValue1 = "test_value1"; HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey).isPresent()); @@ -616,7 +616,7 @@ public void testHandleHollowCommitIfNeeded(HollowCommitHandling handlingMode) th @Test public void testGetDroppedPartitions() throws Exception { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String olderPartition = "p1"; // older partitions that will be deleted by clean commit diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index f5bc72134304..1fd810b75582 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -121,12 +121,16 @@ public void testLoadingInstantsFromFiles() throws IOException { assertStreamEquals( Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete, instant5), timeline.getCommitTimeline().getInstantsAsStream(), "Check the instants stream"); + + assertStreamEquals( + Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete, instant5), + timeline.getCommitAndReplaceTimeline().getInstantsAsStream(), "Check the instants stream"); assertStreamEquals( Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete), - timeline.getCommitTimeline().filterCompletedInstants().getInstantsAsStream(), + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().getInstantsAsStream(), "Check the instants stream"); assertStreamEquals(Stream.of(instant5), - timeline.getCommitTimeline().filterPendingExcludingMajorAndMinorCompaction().getInstantsAsStream(), + timeline.getCommitAndReplaceTimeline().filterPendingExcludingMajorAndMinorCompaction().getInstantsAsStream(), "Check the instants stream"); // Backwards compatibility testing for reading compaction plans @@ -175,23 +179,23 @@ public void testTimelineOperations() { timeline = new MockHoodieTimeline(Stream.of("01", "03", "05", "07", "09", "11", "13", "15", "17", "19"), Stream.of("21", "23")); assertStreamEquals(Stream.of("05", "07", "09", "11"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsInRange("04", "11") + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsInRange("04", "11") .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsInRange should return 4 instants"); assertStreamEquals(Stream.of("03", "05", "07", "09", "11"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsInClosedRange("03", "11") + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsInClosedRange("03", "11") .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsInClosedRange should return 5 instants"); assertStreamEquals(Stream.of("09", "11"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07", 2) + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsAfter("07", 2) .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsAfter 07 should return 2 instants"); assertStreamEquals(Stream.of("01", "03", "05"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsBefore("07") + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsBefore("07") .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsBefore 07 should return 3 instants"); assertFalse(timeline.empty()); - assertFalse(timeline.getCommitTimeline().filterPendingExcludingMajorAndMinorCompaction().empty()); + assertFalse(timeline.getCommitAndReplaceTimeline().filterPendingExcludingMajorAndMinorCompaction().empty()); assertEquals(12, timeline.countInstants()); assertEquals("01", timeline.firstInstant( HoodieTimeline.COMMIT_ACTION, State.COMPLETED).get().getTimestamp()); @@ -202,7 +206,7 @@ public void testTimelineOperations() { assertFalse(timeline.firstInstant( HoodieTimeline.REPLACE_COMMIT_ACTION, State.COMPLETED).isPresent()); - HoodieTimeline activeCommitTimeline = timeline.getCommitTimeline().filterCompletedInstants(); + HoodieTimeline activeCommitTimeline = timeline.getCommitAndReplaceTimeline().filterCompletedInstants(); assertEquals(10, activeCommitTimeline.countInstants()); assertEquals("01", activeCommitTimeline.firstInstant().get().getTimestamp()); @@ -347,7 +351,7 @@ public void testTimelineGetOperations() { HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); checkTimeline.accept(timeline.getWriteTimeline(), CollectionUtils.createSet( HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); - checkTimeline.accept(timeline.getCommitTimeline(), CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); + checkTimeline.accept(timeline.getCommitAndReplaceTimeline(), CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); checkTimeline.accept(timeline.getDeltaCommitTimeline(), Collections.singleton(HoodieTimeline.DELTA_COMMIT_ACTION)); checkTimeline.accept(timeline.getCleanerTimeline(), Collections.singleton(HoodieTimeline.CLEAN_ACTION)); checkTimeline.accept(timeline.getRollbackTimeline(), Collections.singleton(HoodieTimeline.ROLLBACK_ACTION)); @@ -554,12 +558,12 @@ public void testFiltering() { public void testReplaceActionsTimeline() { int instantTime = 1; List allInstants = new ArrayList<>(); - HoodieInstant instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); - allInstants.add(instant); - instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); - allInstants.add(instant); - instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, String.format("%03d", instantTime++)); - allInstants.add(instant); + HoodieInstant instant1 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant1); + HoodieInstant instant2 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant2); + HoodieInstant instant3 = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant3); timeline = new HoodieActiveTimeline(metaClient); timeline.setInstants(allInstants); @@ -567,8 +571,16 @@ public void testReplaceActionsTimeline() { timeline.getCompletedReplaceTimeline().getInstants(); assertEquals(1, validReplaceInstants.size()); - assertEquals(instant.getTimestamp(), validReplaceInstants.get(0).getTimestamp()); + assertEquals(instant3.getTimestamp(), validReplaceInstants.get(0).getTimestamp()); assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, validReplaceInstants.get(0).getAction()); + + assertStreamEquals( + Stream.of(instant1, instant2, instant3), + timeline.getCommitAndReplaceTimeline().getInstantsAsStream(), "Check the instants stream"); + + assertStreamEquals( + Stream.of(instant1, instant2), + timeline.getCommitTimeline().getInstantsAsStream(), "Check the instants stream"); } @Test diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index e7174ec1ac51..a7c7164505d1 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -288,6 +288,59 @@ public void testGetDeltaCommitsSinceLatestCompaction(boolean hasCompletedCompact } } + @Test + public void testGetDeltaCommitsSinceLastCompactionWithCompletedReplaceCommits() { + // 4th replace commit. + HoodieActiveTimeline timeline = new MockHoodieActiveTimeline( + Stream.of(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "01"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "02"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "03"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "04"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "05"), + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09")).collect(Collectors.toList())); + + Pair actual = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline).get(); + assertEquals( + Stream.of( + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09")) + .collect(Collectors.toList()), + actual.getLeft().getInstants()); + assertEquals( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"), + actual.getRight()); + + // mix of compaction commit and replace commit. + timeline = new MockHoodieActiveTimeline( + Stream.of(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "01"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "02"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "03"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "04"), + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "05"), + new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, "06"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09")).collect(Collectors.toList())); + + actual = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline).get(); + assertEquals( + Stream.of( + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09")) + .collect(Collectors.toList()), + actual.getLeft().getInstants()); + assertEquals( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "05"), + actual.getRight()); + } + @Test public void testGetDeltaCommitsSinceLatestCompactionWithEmptyDeltaCommits() { HoodieActiveTimeline timeline = new MockHoodieActiveTimeline(); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala index 2319d40480e7..1f523aabc993 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala @@ -54,7 +54,7 @@ class RepairAddpartitionmetaProcedure extends BaseProcedure with ProcedureBuilde val metaClient = createMetaClient(jsc, tablePath) - val latestCommit: String = metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp + val latestCommit: String = metaClient.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp val partitionPaths: util.List[String] = FSUtils.getAllPartitionFoldersThreeLevelsDown(metaClient.getStorage, tablePath); val basePath: StoragePath = new StoragePath(tablePath) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala index cfda38a446f2..deb62b607f36 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala @@ -72,7 +72,7 @@ class RepairMigratePartitionMetaProcedure extends BaseProcedure with ProcedureBu metaClient.getStorage, partition) val baseFormatFile: Option[StoragePath] = HoodiePartitionMetadata.baseFormatMetaPathIfExists( metaClient.getStorage, partition) - val latestCommit: String = metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp + val latestCommit: String = metaClient.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp var action = if (textFormatFile.isPresent) "MIGRATE" else "NONE" if (!dryRun) { if (!baseFormatFile.isPresent) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala index 22d0e423155c..1c5221081e25 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala @@ -66,7 +66,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil .withBasePath(basePath) .withLogFilePaths(logFilePaths.asJava) .withReaderSchema(schema) - .withLatestInstantTime(client.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp) + .withLatestInstantTime(client.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp) .withReverseReader(java.lang.Boolean.parseBoolean(HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue)) .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue) .withMaxMemorySizeInBytes(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala index 36be3b146783..5556fd93b33e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala @@ -46,7 +46,7 @@ class StatsWriteAmplificationProcedure extends BaseProcedure with ProcedureBuild val basePath = getBasePath(table) val client = createMetaClient(jsc, basePath) val activeTimeline = client.getActiveTimeline - val timeline = activeTimeline.getCommitTimeline.filterCompletedInstants() + val timeline = activeTimeline.getCommitAndReplaceTimeline.filterCompletedInstants() val rows = new java.util.ArrayList[Row] val df = new DecimalFormat("#.00") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala index 10a101607459..57a17b213b88 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala @@ -190,7 +190,7 @@ class ValidateHoodieSyncProcedure extends BaseProcedure with ProcedureBuilder wi @throws[IOException] def countNewRecords(target: HoodieTableMetaClient, commitsToCatchup: List[String]): Long = { var totalNew: Long = 0 - val timeline: HoodieTimeline = target.reloadActiveTimeline.getCommitTimeline.filterCompletedInstants + val timeline: HoodieTimeline = target.reloadActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants for (commit <- commitsToCatchup) { val c: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get, classOf[HoodieCommitMetadata]) totalNew += c.fetchTotalRecordsWritten - c.fetchTotalUpdateRecordsWritten diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index 086363e447ca..d02204dbe9b6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -202,9 +202,9 @@ public void run() throws Exception { HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient(jssc, tablePath); if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { // Ensure we have successfully completed one compaction commit - ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().countInstants() == 1); + ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitAndReplaceTimeline().countInstants() == 1); } else { - ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().countInstants() >= 1); + ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitAndReplaceTimeline().countInstants() >= 1); } // Deletes Stream diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala index 602a1a5db02c..7f557e7e1d59 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala @@ -181,6 +181,6 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { } // compaction should have been completed val metaClient = HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(fs.getConf), basePath) - assertEquals(1, metaClient.getActiveTimeline.getCommitTimeline.countInstants()) + assertEquals(1, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.countInstants()) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 054744109b02..babe1f73acdd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -503,6 +503,6 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase { streamingWrite(inputDF.schema, sourcePath, destPath, opts, id) } val metaClient = HoodieTestUtils.createMetaClient(storage, destPath); - assertTrue(metaClient.getActiveTimeline.getCommitTimeline.empty()) + assertTrue(metaClient.getActiveTimeline.getCommitAndReplaceTimeline.empty()) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala index e4c072d9d1f5..cfc0735b2fd3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala @@ -28,7 +28,6 @@ import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.config.HoodieWriteConfig - import org.apache.avro.generic.GenericRecord import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode} @@ -333,6 +332,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val inputDF4 = spark.read.json(spark.sparkContext.parallelize(records4, 2)) inputDF4.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .mode(SaveMode.Append) .save(basePath) @@ -357,6 +357,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { .options(options) .option("hoodie.clustering.inline", "true") .option("hoodie.clustering.inline.max.commits", "1") + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) val instant5 = metaClient.reloadActiveTimeline.lastInstant().get() @@ -385,6 +386,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val inputDF6 = spark.read.json(spark.sparkContext.parallelize(records6, 2)) inputDF6.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL) .mode(SaveMode.Append) .save(basePath) @@ -407,27 +409,32 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val inputDF7 = spark.read.json(spark.sparkContext.parallelize(records7, 2)) inputDF7.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) + totalInsertedCnt += 7 val records8 = recordsToStrings(dataGen.generateInserts("007", 3)).asScala.toList val inputDF8 = spark.read.json(spark.sparkContext.parallelize(records8, 2)) inputDF8.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) val instant8 = metaClient.reloadActiveTimeline.lastInstant().get() val commitTime8 = instant8.getTimestamp + totalInsertedCnt += 3 // 8. Upsert Operation With Clean Operation - val records9 = recordsToStrings(dataGen.generateUniqueUpdates("008", 30)).asScala.toList - val inputDF9 = spark.read.json(spark.sparkContext.parallelize(records9, 2)) + val inputDF9 = inputDF6.limit(30) // 30 updates to inserts added after insert overwrite table. if not for this, updates generated from datagne, + // could split as inserts and updates from hudi standpoint due to insert overwrite table operation. inputDF9.write.format("org.apache.hudi") .options(options) .option("hoodie.clean.automatic", "true") - .option("hoodie.keep.min.commits", "4") - .option("hoodie.keep.max.commits", "5") - .option("hoodie.clean.commits.retained", "3") + .option("hoodie.keep.min.commits", "16") + .option("hoodie.keep.max.commits", "17") + .option("hoodie.clean.commits.retained", "15") + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) val instant9 = metaClient.reloadActiveTimeline.lastInstant().get() @@ -440,13 +447,8 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val updatedCnt9 = 30 - insertedCnt9 assertCDCOpCnt(cdcDataOnly9, insertedCnt9, updatedCnt9, 0) - // here cause we do the clean operation and just remain the commit6 and commit7, so we need to reset the total cnt. - // 70 is the number of inserted records at commit 6. - totalInsertedCnt = 80 + insertedCnt9 - totalUpdatedCnt = updatedCnt9 - totalDeletedCnt = 0 allVisibleCDCData = cdcDataFrame((commitTime1.toLong - 1).toString) - assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt, totalDeletedCnt) + assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt + 30, totalDeletedCnt) } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala index d56ee5758e19..154d9ecfb5ac 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala @@ -254,7 +254,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPath))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files @@ -311,7 +311,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPathWithUpdates))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 2 files @@ -369,7 +369,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPathWithUpserts))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files @@ -427,7 +427,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPath))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 6aebde9a443a..de1545779216 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -636,7 +636,7 @@ static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, S static void assertAtleastNCompactionCommits(int minExpected, String tablePath) { HoodieTableMetaClient meta = createMetaClient(storage, tablePath); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); int numCompactionCommits = timeline.countInstants(); assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); @@ -652,7 +652,7 @@ static void assertAtleastNDeltaCommits(int minExpected, String tablePath) { static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) { HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); int numCompactionCommits = timeline.countInstants(); assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 17d6afd5cb08..84dd59861146 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -857,7 +857,7 @@ public void testDeltaSyncWithPendingCompaction() throws Exception { // delete compaction commit HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient(storage, tableBasePath); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); HoodieInstant commitInstant = timeline.lastInstant().get(); String commitFileName = tableBasePath + "/.hoodie/" + commitInstant.getFileName(); fs.delete(new Path(commitFileName), false);