diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 1679a32700772..6a297e868e061 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -316,7 +316,7 @@ private static String printAllCompactions(HoodieDefaultTimeline timeline, .filter(pair -> pair.getRight() != null) .collect(Collectors.toList()); - Set committedInstants = timeline.getCommitTimeline().filterCompletedInstants() + Set committedInstants = timeline.getCommitAndReplaceTimeline().filterCompletedInstants() .getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); List rows = new ArrayList<>(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java index cbb2ae2177ca3..e9a3a3c922ac6 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java @@ -247,7 +247,7 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m HoodieTimeline timeline; if (basefileOnly) { - timeline = metaClient.getActiveTimeline().getCommitTimeline(); + timeline = metaClient.getActiveTimeline().getCommitAndReplaceTimeline(); } else if (excludeCompaction) { timeline = metaClient.getActiveTimeline().getCommitsTimeline(); } else { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 307ca81cea07d..b4c72021ee6ee 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -232,7 +232,7 @@ storage, new StoragePath(logFilePathPattern)).stream() .withReaderSchema(readerSchema) .withLatestInstantTime( client.getActiveTimeline() - .getCommitTimeline().lastInstant().get().getTimestamp()) + .getCommitAndReplaceTimeline().lastInstant().get().getTimestamp()) .withReverseReader( Boolean.parseBoolean( HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue())) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 8783e749057f9..2418976c4e451 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -118,7 +118,7 @@ public String addPartitionMeta( HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); String latestCommit = - client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); + client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().get().getTimestamp(); List partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.storage, client.getBasePath()); StoragePath basePath = client.getBasePathV2(); @@ -239,7 +239,7 @@ public String migratePartitionMeta( Option baseFormatFile = HoodiePartitionMetadata.baseFormatMetaPathIfExists(HoodieCLI.storage, partition); String latestCommit = - client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); + client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().get().getTimestamp(); String[] row = new String[] { partitionPath, diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java index f8e60ba8cee14..9f859bf72bfc9 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java @@ -69,7 +69,7 @@ public String writeAmplificationStats( long totalRecordsWritten = 0; HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = activeTimeline.getCommitAndReplaceTimeline().filterCompletedInstants(); List rows = new ArrayList<>(); DecimalFormat df = new DecimalFormat("#.00"); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java index 21910fd956dfe..12322617fb2dd 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java @@ -36,7 +36,7 @@ public class CommitUtil { public static long countNewRecords(HoodieTableMetaClient metaClient, List commitsToCatchup) throws IOException { long totalNew = 0; - HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); for (String commit : commitsToCatchup) { HoodieCommitMetadata c = HoodieCommitMetadata.fromBytes( timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get(), diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java index c3bbbef0cf41c..87bb2b7d4064b 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java @@ -192,7 +192,7 @@ private void testRefreshCommand(String command) throws IOException { assertTrue(prepareTable()); HoodieTimeline timeline = - HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); assertEquals(0, timeline.countInstants(), "There should have no instant at first"); // generate four savepoints @@ -203,14 +203,14 @@ private void testRefreshCommand(String command) throws IOException { // Before refresh, no instant timeline = - HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); assertEquals(0, timeline.countInstants(), "there should have no instant"); Object result = shell.evaluate(() -> command); assertTrue(ShellEvaluationResultUtil.isSuccess(result)); timeline = - HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); // After refresh, there are 4 instants assertEquals(4, timeline.countInstants(), "there should have 4 instants"); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java index 8f1d07b4eb561..ced1cf7a3ef00 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java @@ -137,7 +137,7 @@ public void testRollbackToSavepoint() throws IOException { assertEquals(1, timeline.getRestoreTimeline().countInstants()); // 103 instant had rollback - assertFalse(timeline.getCommitTimeline().containsInstant( + assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant( new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103"))); } @@ -182,9 +182,9 @@ public void testRollbackToSavepointWithMetadataTableEnable() throws Exception { assertEquals(1, timeline.getRestoreTimeline().countInstants()); // 103 and 104 instant had rollback - assertFalse(timeline.getCommitTimeline().containsInstant( + assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant( new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103"))); - assertFalse(timeline.getCommitTimeline().containsInstant( + assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant( new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "104"))); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java index 99b5d833f509b..6023b17ce0d26 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java @@ -143,7 +143,7 @@ public static Option loadMetadata(HoodieTable t && maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()))) { return loadMetadataFromGivenFile(table, maxMetadataFile); } - HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); // fix the in-consistency between un-committed and committed hashing metadata files. List fixed = new ArrayList<>(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index dd292830a85a5..46323954a5bbf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -1330,7 +1330,7 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String late protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) { Option lastCompletedCompactionInstant = metadataMetaClient.reloadActiveTimeline() - .getCommitTimeline().filterCompletedInstants().lastInstant(); + .getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant(); if (lastCompletedCompactionInstant.isPresent() && metadataMetaClient.getActiveTimeline().filterCompletedInstants() .findInstantsAfter(lastCompletedCompactionInstant.get().getTimestamp()).countInstants() < 3) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java index 8703ffb9de0c4..7084ae013e4fc 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java @@ -132,7 +132,7 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) // for new inserts, compute buckets depending on how many records we have for each partition Set partitionPaths = profile.getPartitionPaths(); long averageRecordSize = - averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(), config); LOG.info("AvgRecordSize => " + averageRecordSize); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 1c26fb820017b..d697c192221a6 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -1716,7 +1716,7 @@ public void testMetadataMultiWriter() throws Exception { assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004"))); // Compaction may occur if the commits completed in order - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClients[0]); @@ -1763,7 +1763,7 @@ public void testMultiWriterForDoubleLocking() throws Exception { // 6 commits and 2 cleaner commits. assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8); - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClient); } @@ -2034,7 +2034,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // There should not be any compaction yet and we have not performed more than maxDeltaCommitsBeforeCompaction // deltacommits (1 will be due to bootstrap) HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 0); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 0); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction - 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2044,7 +2044,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { client.startCommitWithTime(newCommitTime); client.insert(records, newCommitTime); metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction + 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2065,7 +2065,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // Ensure no more compactions took place due to the leftover inflight commit metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction /* clean from dataset */) + 1)/* clean in metadata table */); @@ -2080,7 +2080,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // Ensure compactions took place metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 2); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 2); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction + 1 /* clean from dataset */) + 2 /* clean in metadata table */)); assertTrue(datasetMetaClient.getArchivedTimeline().reload().countInstants() > 0); @@ -2428,7 +2428,7 @@ public void testRepeatedActionWithSameInstantTime() throws Exception { client.upsert(records, newCommitTime); } } - assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(), 3); + assertEquals(metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 3); try (HoodieJavaWriteClient client = new HoodieJavaWriteClient(engineContext, writeConfig)) { // Perform a clean diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java index 6f5352e2a34e1..0d4b77ec43d0a 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java @@ -520,7 +520,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, 0, 150); HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false); - List instants = activeTimeline.getCommitTimeline().getInstants(); + List instants = activeTimeline.getCommitAndReplaceTimeline().getInstants(); assertEquals(5, instants.size()); assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), instants.get(0)); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java index 430f8f01a5e24..1e43a4d384003 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java @@ -867,7 +867,7 @@ private List getWriteStatusAndVerifyDeleteOperation(String newCommi // verify that there is a commit HoodieTableMetaClient metaClient = createMetaClient(); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); if (assertForCommit) { assertEquals(3, timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 007097a0a6cd3..230f684d165e2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -135,7 +135,7 @@ public void readLocalWriteHDFS() throws Exception { // Read from hdfs FileSystem fs = HadoopFSUtils.getFs(dfsBasePath, HoodieTestUtils.getDefaultStorageConf()); HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(HadoopFSUtils.getStorageConf(fs.getConf()), dfsBasePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); Dataset readRecords = HoodieClientTestUtils.readCommit(dfsBasePath, sqlContext, timeline, readCommitTime); assertEquals(readRecords.count(), records.size()); @@ -156,7 +156,7 @@ public void readLocalWriteHDFS() throws Exception { LOG.info("Reading from path: " + tablePath); fs = HadoopFSUtils.getFs(tablePath, HoodieTestUtils.getDefaultStorageConf()); metaClient = HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(fs.getConf()), tablePath); - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); Dataset localReadRecords = HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, writeCommitTime); assertEquals(localReadRecords.count(), localRecords.size()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index aeb0627744efc..9ed2dce3ce54a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -309,7 +309,7 @@ public void testCopyOnWriteTable(boolean shouldAllowDroppedColumns) throws Excep (String s, Integer a) -> evolvedRecords, SparkRDDWriteClient::insert, true, numRecords, 3 * numRecords, 6, false); // new commit - HoodieTimeline curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline curTimeline = metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("006")); checkReadRecords("000", 3 * numRecords); @@ -333,7 +333,7 @@ public void testCopyOnWriteTable(boolean shouldAllowDroppedColumns) throws Excep private void checkReadRecords(String instantTime, int numExpectedRecords) throws IOException { if (tableType == HoodieTableType.COPY_ON_WRITE) { - HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline(); + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline(); assertEquals(numExpectedRecords, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of(instantTime))); } else { // TODO: This code fails to read records under the following conditions: diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 30b1b63998d05..3dfb61c2ceac3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -2131,7 +2131,7 @@ public void testMetadataMultiWriter() throws Exception { assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004"))); // Compaction may occur if the commits completed in order - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClients[0]); @@ -2179,7 +2179,7 @@ public void testMultiWriterForDoubleLocking() throws Exception { // 6 commits and 2 cleaner commits. assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8); - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClient); } @@ -2444,7 +2444,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // There should not be any compaction yet and we have not performed more than maxDeltaCommitsBeforeCompaction // deltacommits (1 will be due to bootstrap) HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 0); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 0); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction - 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2454,7 +2454,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { client.startCommitWithTime(newCommitTime); client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction + 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2475,7 +2475,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // Ensure no more compactions took place due to the leftover inflight commit metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction /* clean from dataset */) + 1)/* clean in metadata table */); @@ -2490,7 +2490,7 @@ public void testCleaningArchivingAndCompaction() throws Exception { // Ensure compactions took place metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 2); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 2); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction + 1 /* clean from dataset */) + 2 /* clean in metadata table */)); assertTrue(datasetMetaClient.getArchivedTimeline().reload().countInstants() > 0); @@ -3120,7 +3120,7 @@ public void testRepeatedActionWithSameInstantTime() throws Exception { client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); } } - assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(), 3); + assertEquals(metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 3); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { // Perform a clean diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 0db85ae69c109..74e998349ea34 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -685,7 +685,7 @@ private void testUpsertsInternal(HoodieWriteConfig config, 0, 150); HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false); - List instants = activeTimeline.getCommitTimeline().getInstants(); + List instants = activeTimeline.getCommitAndReplaceTimeline().getInstants(); assertEquals(5, instants.size()); assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), instants.get(0)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index c451f4bd938e1..ad612ee5c9b98 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -121,7 +121,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap // verify that there is a commit metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit."); assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), "Latest commit should be 001"); @@ -147,7 +147,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap // verify that there are 2 commits metaClient = HoodieTableMetaClient.reload(metaClient); - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(2, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting two commits."); assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), "Latest commit should be 002"); Dataset dataSet = getRecords(); @@ -167,7 +167,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap // verify that there are now 3 commits metaClient = HoodieTableMetaClient.reload(metaClient); - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(3, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting three commits."); assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), "Latest commit should be 003"); dataSet = getRecords(); @@ -197,7 +197,7 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap assertNoWriteErrors(statuses); // verify there are now 4 commits - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(4, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting four commits."); assertEquals(timeline.lastInstant().get().getTimestamp(), newCommitTime, "Latest commit should be 004"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 723fa6b16141e..2de9f5d378487 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -154,7 +154,7 @@ public static Pair> insertFirstBigBatchForClientCle assertNoWriteErrors(statuses.collect()); // verify that there is a commit metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit."); // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index f037f46a30934..9e1f4277c57f7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -161,7 +161,7 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { assertTrue(deltaCommit.isPresent()); assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List allFiles = listAllBaseFilesInPath(hoodieTable); @@ -195,7 +195,7 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { assertTrue(deltaCommit.isPresent()); assertEquals("002", deltaCommit.get().getTimestamp(), "Latest Delta commit should be 002"); - commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); allFiles = listAllBaseFilesInPath(hoodieTable); @@ -653,7 +653,7 @@ public void testHandleUpdateWithMultiplePartitions() throws Exception { assertTrue(deltaCommit.isPresent()); assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List allFiles = listAllBaseFilesInPath(hoodieTable); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 209d70e499a1b..f271356bcb902 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -270,7 +270,7 @@ public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception { // Then: 1 delta commit is done, the failed compaction is retried metaClient = createMetaClient(cfg.getBasePath()); assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); - assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } @Test @@ -308,7 +308,7 @@ public void testCompactionRetryOnFailureBasedOnTime() throws Exception { metaClient = createMetaClient(cfg.getBasePath()); // 2 delta commits at the beginning. 1 compaction, 1 delta commit following it. assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); - assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + assertEquals(instantTime, metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } @Test @@ -345,6 +345,6 @@ public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception { // Then: 1 delta commit is done, the failed compaction is retried metaClient = createMetaClient(cfg.getBasePath()); assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); - assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + assertEquals(instantTime, metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 00ff11b57d036..e78ed757e8fe3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -289,7 +289,7 @@ private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfi //2. rollback HoodieInstant commitInstant; if (isUsingMarkers) { - commitInstant = table.getActiveTimeline().getCommitTimeline().filterInflights().lastInstant().get(); + commitInstant = table.getActiveTimeline().getCommitAndReplaceTimeline().filterInflights().lastInstant().get(); } else { commitInstant = table.getCompletedCommitTimeline().lastInstant().get(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index 8e85208af6fbd..dd1d6c2431a39 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -284,7 +284,7 @@ public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws E assertTrue(deltaCommit.isPresent()); assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List allFiles = listAllBaseFilesInPath(hoodieTable); @@ -327,7 +327,7 @@ public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws E assertTrue(deltaCommit.isPresent()); assertEquals("004", deltaCommit.get().getTimestamp(), "Latest Delta commit should be 004"); - commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); allFiles = listAllBaseFilesInPath(hoodieTable); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 10d26f8369822..c08026946c0ee 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -123,7 +123,7 @@ void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws Exc client.commit(newCommitTime, jsc().parallelize(statuses)); metaClient = HoodieTableMetaClient.reload(metaClient); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertTrue(commit.isPresent()); assertEquals("001", commit.get().getTimestamp(), "commit should be 001"); @@ -199,7 +199,7 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro assertTrue(deltaCommit.isPresent()); assertEquals("000000001", deltaCommit.get().getTimestamp(), "Delta commit should be 000000001"); - Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List allFiles = listAllBaseFilesInPath(hoodieTable); @@ -505,7 +505,7 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { assertEquals(200, getTotalRecordsWritten(instantCommitMetadataPairOpt.get().getValue())); Option commit = - metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 09aff48224de9..b41c15a9898f8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -529,7 +529,7 @@ private JavaRDD getWriteStatusAndVerifyDeleteOperation(String newCo // verify that there is a commit HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(storageConf, basePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); if (assertForCommit) { assertEquals(3, timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index e45578211cbe7..79dda856367bf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -289,7 +289,7 @@ protected Stream insertRecordsToMORTable(HoodieTableMetaClient m "Delta commit should be specified value"); Option commit = - reloadedMetaClient.getActiveTimeline().getCommitTimeline().lastInstant(); + reloadedMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant(); assertFalse(commit.isPresent()); List allFiles = listAllBaseFilesInPath(hoodieTable); @@ -337,7 +337,7 @@ protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List commit = - reloadedMetaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + reloadedMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 319cbdfbb4a3e..436a8c221feab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -547,7 +547,7 @@ public boolean isTimelineNonEmpty() { public HoodieTimeline getCommitsTimeline() { switch (this.getTableType()) { case COPY_ON_WRITE: - return getActiveTimeline().getCommitTimeline(); + return getActiveTimeline().getCommitAndReplaceTimeline(); case MERGE_ON_READ: // We need to include the parquet files written out in delta commits // Include commit action to be able to start doing a MOR over a COW table - no @@ -567,7 +567,7 @@ public HoodieTimeline getCommitsTimeline() { public HoodieTimeline getCommitsAndCompactionTimeline() { switch (this.getTableType()) { case COPY_ON_WRITE: - return getActiveTimeline().getCommitTimeline(); + return getActiveTimeline().getCommitAndReplaceTimeline(); case MERGE_ON_READ: return getActiveTimeline().getWriteTimeline(); default: @@ -583,7 +583,7 @@ public HoodieTimeline getCommitTimeline() { case COPY_ON_WRITE: case MERGE_ON_READ: // We need to include the parquet files written out in delta commits in tagging - return getActiveTimeline().getCommitTimeline(); + return getActiveTimeline().getCommitAndReplaceTimeline(); default: throw new HoodieException("Unsupported table type :" + this.getTableType()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 68cf428d36460..12ea0085d51c0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -318,13 +318,20 @@ public HoodieTimeline getAllCommitsTimeline() { } /** - * Get only pure commits (inflight and completed) in the active timeline. + * Get only pure commit and replace commits (inflight and completed) in the active timeline. */ - public HoodieTimeline getCommitTimeline() { + public HoodieTimeline getCommitAndReplaceTimeline() { //TODO: Make sure this change does not break existing functionality. return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, REPLACE_COMMIT_ACTION)); } + /** + * Get only pure commits (inflight and completed) in the active timeline. + */ + public HoodieTimeline getCommitTimeline() { + return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)); + } + /** * Get only the delta commits (inflight and completed) in the active timeline. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java index 0f41f1314e1f7..4ef30a2656a82 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java @@ -285,8 +285,7 @@ public static List getPendingCompactionInstantTimes(HoodieTableMe */ public static Option> getDeltaCommitsSinceLatestCompaction( HoodieActiveTimeline activeTimeline) { - Option lastCompaction = activeTimeline.getCommitTimeline() - .filterCompletedInstants().lastInstant(); + Option lastCompaction = activeTimeline.getCommitTimeline().filterCompletedInstants().lastInstant(); HoodieTimeline deltaCommits = activeTimeline.getDeltaCommitTimeline(); HoodieInstant latestInstant; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index efdb1baf23d2c..2cb42af683b4a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -593,7 +593,7 @@ public Option getSyncedInstantTime() { @Override public Option getLatestCompactionTime() { if (metadataMetaClient != null) { - Option latestCompaction = metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); + Option latestCompaction = metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant(); if (latestCompaction.isPresent()) { return Option.of(latestCompaction.get().getTimestamp()); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java index 9bbc72289f5c2..0b90889cfa7be 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java @@ -86,7 +86,7 @@ public void checkSerDe() { @Test public void checkCommitTimeline() { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty(), "Should be empty commit timeline"); HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "1"); @@ -95,12 +95,12 @@ public void checkCommitTimeline() { // Commit timeline should not auto-reload every time getActiveCommitTimeline(), it should be cached activeTimeline = metaClient.getActiveTimeline(); - activeCommitTimeline = activeTimeline.getCommitTimeline(); + activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty(), "Should be empty commit timeline"); - HoodieInstant completedInstant = HoodieTimeline.getCompletedInstant(instant); activeTimeline = activeTimeline.reload(); - activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieInstant completedInstant = activeTimeline.getCommitsTimeline().getInstantsAsStream().findFirst().get(); + activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertFalse(activeCommitTimeline.empty(), "Should be the 1 commit we made"); assertEquals(completedInstant, activeCommitTimeline.getInstantsAsStream().findFirst().get(), "Commit should be 1"); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index eef515c6ada8a..588fc114a3e8c 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -107,7 +107,7 @@ public void tearDown() throws Exception { @Test public void testGetPartitionsWithReplaceCommits() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String ts1 = "1"; @@ -146,7 +146,7 @@ public void testGetPartitionsWithReplaceCommits() throws IOException { @Test public void testGetPartitions() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String olderPartition = "0"; // older partitions that is modified by all cleans @@ -185,7 +185,7 @@ public void testGetPartitions() throws IOException { @Test public void testGetPartitionsUnPartitioned() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String partitionPath = ""; @@ -213,7 +213,7 @@ public void testGetPartitionsUnPartitioned() throws IOException { @Test public void testRestoreInstants() throws Exception { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); for (int i = 1; i <= 5; i++) { @@ -238,7 +238,7 @@ public void testGetExtraMetadata() throws Exception { String extraMetadataKey = "test_key"; String extraMetadataValue1 = "test_value1"; HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey).isPresent()); @@ -616,7 +616,7 @@ public void testHandleHollowCommitIfNeeded(HollowCommitHandling handlingMode) th @Test public void testGetDroppedPartitions() throws Exception { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String olderPartition = "p1"; // older partitions that will be deleted by clean commit diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index fa2d7558ef573..1d4be5f02c8ac 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -120,12 +120,16 @@ public void testLoadingInstantsFromFiles() throws IOException { assertStreamEquals( Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete, instant5), timeline.getCommitTimeline().getInstantsAsStream(), "Check the instants stream"); + + assertStreamEquals( + Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete, instant5), + timeline.getCommitAndReplaceTimeline().getInstantsAsStream(), "Check the instants stream"); assertStreamEquals( Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete), - timeline.getCommitTimeline().filterCompletedInstants().getInstantsAsStream(), + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().getInstantsAsStream(), "Check the instants stream"); assertStreamEquals(Stream.of(instant5), - timeline.getCommitTimeline().filterPendingExcludingMajorAndMinorCompaction().getInstantsAsStream(), + timeline.getCommitAndReplaceTimeline().filterPendingExcludingMajorAndMinorCompaction().getInstantsAsStream(), "Check the instants stream"); // Backwards compatibility testing for reading compaction plans @@ -174,23 +178,23 @@ public void testTimelineOperations() { timeline = new MockHoodieTimeline(Stream.of("01", "03", "05", "07", "09", "11", "13", "15", "17", "19"), Stream.of("21", "23")); assertStreamEquals(Stream.of("05", "07", "09", "11"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsInRange("04", "11") + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsInRange("04", "11") .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsInRange should return 4 instants"); assertStreamEquals(Stream.of("03", "05", "07", "09", "11"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsInClosedRange("03", "11") + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsInClosedRange("03", "11") .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsInClosedRange should return 5 instants"); assertStreamEquals(Stream.of("09", "11"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07", 2) + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsAfter("07", 2) .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsAfter 07 should return 2 instants"); assertStreamEquals(Stream.of("01", "03", "05"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsBefore("07") + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsBefore("07") .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsBefore 07 should return 3 instants"); assertFalse(timeline.empty()); - assertFalse(timeline.getCommitTimeline().filterPendingExcludingMajorAndMinorCompaction().empty()); + assertFalse(timeline.getCommitAndReplaceTimeline().filterPendingExcludingMajorAndMinorCompaction().empty()); assertEquals(12, timeline.countInstants()); assertEquals("01", timeline.firstInstant( HoodieTimeline.COMMIT_ACTION, State.COMPLETED).get().getTimestamp()); @@ -201,7 +205,7 @@ public void testTimelineOperations() { assertFalse(timeline.firstInstant( HoodieTimeline.REPLACE_COMMIT_ACTION, State.COMPLETED).isPresent()); - HoodieTimeline activeCommitTimeline = timeline.getCommitTimeline().filterCompletedInstants(); + HoodieTimeline activeCommitTimeline = timeline.getCommitAndReplaceTimeline().filterCompletedInstants(); assertEquals(10, activeCommitTimeline.countInstants()); assertEquals("01", activeCommitTimeline.firstInstant().get().getTimestamp()); @@ -346,7 +350,7 @@ public void testTimelineGetOperations() { HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); checkTimeline.accept(timeline.getWriteTimeline(), CollectionUtils.createSet( HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); - checkTimeline.accept(timeline.getCommitTimeline(), CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); + checkTimeline.accept(timeline.getCommitAndReplaceTimeline(), CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); checkTimeline.accept(timeline.getDeltaCommitTimeline(), Collections.singleton(HoodieTimeline.DELTA_COMMIT_ACTION)); checkTimeline.accept(timeline.getCleanerTimeline(), Collections.singleton(HoodieTimeline.CLEAN_ACTION)); checkTimeline.accept(timeline.getRollbackTimeline(), Collections.singleton(HoodieTimeline.ROLLBACK_ACTION)); @@ -551,12 +555,12 @@ public void testFiltering() { public void testReplaceActionsTimeline() { int instantTime = 1; List allInstants = new ArrayList<>(); - HoodieInstant instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); - allInstants.add(instant); - instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); - allInstants.add(instant); - instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, String.format("%03d", instantTime++)); - allInstants.add(instant); + HoodieInstant instant1 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant1); + HoodieInstant instant2 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant2); + HoodieInstant instant3 = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant3); timeline = new HoodieActiveTimeline(metaClient); timeline.setInstants(allInstants); @@ -564,8 +568,16 @@ public void testReplaceActionsTimeline() { timeline.getCompletedReplaceTimeline().getInstants(); assertEquals(1, validReplaceInstants.size()); - assertEquals(instant.getTimestamp(), validReplaceInstants.get(0).getTimestamp()); + assertEquals(instant3.getTimestamp(), validReplaceInstants.get(0).getTimestamp()); assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, validReplaceInstants.get(0).getAction()); + + assertStreamEquals( + Stream.of(instant1, instant2, instant3), + timeline.getCommitAndReplaceTimeline().getInstantsAsStream(), "Check the instants stream"); + + assertStreamEquals( + Stream.of(instant1, instant2), + timeline.getCommitTimeline().getInstantsAsStream(), "Check the instants stream"); } @Test diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index 4741cdef1f81b..407251c64b215 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -291,6 +291,59 @@ public void testGetDeltaCommitsSinceLatestCompaction(boolean hasCompletedCompact } } + @Test + public void testGetDeltaCommitsSinceLastCompactionWithCompletedReplaceCommits() { + // 4th replace commit. + HoodieActiveTimeline timeline = new MockHoodieActiveTimeline( + Stream.of(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "01"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "02"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "03"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "04"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "05"), + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09"))); + + Pair actual = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline).get(); + assertEquals( + Stream.of( + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09")) + .collect(Collectors.toList()), + actual.getLeft().getInstants()); + assertEquals( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"), + actual.getRight()); + + // mix of compaction commit and replace commit. + timeline = new MockHoodieActiveTimeline( + Stream.of(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "01"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "02"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "03"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "04"), + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "05"), + new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, "06"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09"))); + + actual = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline).get(); + assertEquals( + Stream.of( + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09")) + .collect(Collectors.toList()), + actual.getLeft().getInstants()); + assertEquals( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "05"), + actual.getRight()); + } + @Test public void testGetDeltaCommitsSinceLatestCompactionWithEmptyDeltaCommits() { HoodieActiveTimeline timeline = new MockHoodieActiveTimeline(); @@ -386,6 +439,11 @@ public MockHoodieActiveTimeline() { this.setInstants(new ArrayList<>()); } + public MockHoodieActiveTimeline(Stream instants) { + super(); + setInstants(instants.collect(Collectors.toList())); + } + public MockHoodieActiveTimeline( Stream completedDeltaCommits, Stream completedCompactionCommits, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala index 2319d40480e70..1f523aabc9938 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala @@ -54,7 +54,7 @@ class RepairAddpartitionmetaProcedure extends BaseProcedure with ProcedureBuilde val metaClient = createMetaClient(jsc, tablePath) - val latestCommit: String = metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp + val latestCommit: String = metaClient.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp val partitionPaths: util.List[String] = FSUtils.getAllPartitionFoldersThreeLevelsDown(metaClient.getStorage, tablePath); val basePath: StoragePath = new StoragePath(tablePath) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala index b9f43e12e661b..292f6d5fdee54 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala @@ -72,7 +72,7 @@ class RepairMigratePartitionMetaProcedure extends BaseProcedure with ProcedureBu metaClient.getStorage, partition) val baseFormatFile: Option[StoragePath] = HoodiePartitionMetadata.baseFormatMetaPathIfExists( metaClient.getStorage, partition) - val latestCommit: String = metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp + val latestCommit: String = metaClient.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp var action = if (textFormatFile.isPresent) "MIGRATE" else "NONE" if (!dryRun) { if (!baseFormatFile.isPresent) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala index 4afa328b84a7d..1a025042f9ba7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala @@ -68,7 +68,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil .withBasePath(basePath) .withLogFilePaths(logFilePaths.asJava) .withReaderSchema(schema) - .withLatestInstantTime(client.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp) + .withLatestInstantTime(client.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp) .withReverseReader(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue)) .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue) .withMaxMemorySizeInBytes(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala index 36be3b146783f..5556fd93b33eb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala @@ -46,7 +46,7 @@ class StatsWriteAmplificationProcedure extends BaseProcedure with ProcedureBuild val basePath = getBasePath(table) val client = createMetaClient(jsc, basePath) val activeTimeline = client.getActiveTimeline - val timeline = activeTimeline.getCommitTimeline.filterCompletedInstants() + val timeline = activeTimeline.getCommitAndReplaceTimeline.filterCompletedInstants() val rows = new java.util.ArrayList[Row] val df = new DecimalFormat("#.00") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala index 10a101607459f..57a17b213b880 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala @@ -190,7 +190,7 @@ class ValidateHoodieSyncProcedure extends BaseProcedure with ProcedureBuilder wi @throws[IOException] def countNewRecords(target: HoodieTableMetaClient, commitsToCatchup: List[String]): Long = { var totalNew: Long = 0 - val timeline: HoodieTimeline = target.reloadActiveTimeline.getCommitTimeline.filterCompletedInstants + val timeline: HoodieTimeline = target.reloadActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants for (commit <- commitsToCatchup) { val c: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get, classOf[HoodieCommitMetadata]) totalNew += c.fetchTotalRecordsWritten - c.fetchTotalUpdateRecordsWritten diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index 086363e447ca1..d02204dbe9b6f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -202,9 +202,9 @@ public void run() throws Exception { HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient(jssc, tablePath); if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { // Ensure we have successfully completed one compaction commit - ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().countInstants() == 1); + ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitAndReplaceTimeline().countInstants() == 1); } else { - ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().countInstants() >= 1); + ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitAndReplaceTimeline().countInstants() >= 1); } // Deletes Stream diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala index ad017a5a4dc64..6e9e2a0a4815d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala @@ -177,6 +177,6 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { } // compaction should have been completed val metaClient = HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(fs.getConf), basePath) - assertEquals(1, metaClient.getActiveTimeline.getCommitTimeline.countInstants()) + assertEquals(1, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.countInstants()) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 054744109b029..babe1f73acddc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -503,6 +503,6 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase { streamingWrite(inputDF.schema, sourcePath, destPath, opts, id) } val metaClient = HoodieTestUtils.createMetaClient(storage, destPath); - assertTrue(metaClient.getActiveTimeline.getCommitTimeline.empty()) + assertTrue(metaClient.getActiveTimeline.getCommitAndReplaceTimeline.empty()) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala index cad585b645336..2da80c888dd93 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala @@ -28,7 +28,6 @@ import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.config.HoodieWriteConfig - import org.apache.avro.generic.GenericRecord import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode} @@ -333,6 +332,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val inputDF4 = spark.read.json(spark.sparkContext.parallelize(records4, 2)) inputDF4.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .mode(SaveMode.Append) .save(basePath) @@ -357,6 +357,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { .options(options) .option("hoodie.clustering.inline", "true") .option("hoodie.clustering.inline.max.commits", "1") + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) val instant5 = metaClient.reloadActiveTimeline.lastInstant().get() @@ -385,6 +386,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val inputDF6 = spark.read.json(spark.sparkContext.parallelize(records6, 2)) inputDF6.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL) .mode(SaveMode.Append) .save(basePath) @@ -407,27 +409,32 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val inputDF7 = spark.read.json(spark.sparkContext.parallelize(records7, 2)) inputDF7.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) + totalInsertedCnt += 7 val records8 = recordsToStrings(dataGen.generateInserts("007", 3)).asScala.toList val inputDF8 = spark.read.json(spark.sparkContext.parallelize(records8, 2)) inputDF8.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) val instant8 = metaClient.reloadActiveTimeline.lastInstant().get() val commitTime8 = instant8.getTimestamp + totalInsertedCnt += 3 // 8. Upsert Operation With Clean Operation - val records9 = recordsToStrings(dataGen.generateUniqueUpdates("008", 30)).asScala.toList - val inputDF9 = spark.read.json(spark.sparkContext.parallelize(records9, 2)) + val inputDF9 = inputDF6.limit(30) // 30 updates to inserts added after insert overwrite table. if not for this, updates generated from datagne, + // could split as inserts and updates from hudi standpoint due to insert overwrite table operation. inputDF9.write.format("org.apache.hudi") .options(options) .option("hoodie.clean.automatic", "true") - .option("hoodie.keep.min.commits", "4") - .option("hoodie.keep.max.commits", "5") - .option("hoodie.cleaner.commits.retained", "3") + .option("hoodie.keep.min.commits", "16") + .option("hoodie.keep.max.commits", "17") + .option("hoodie.clean.commits.retained", "15") + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) val instant9 = metaClient.reloadActiveTimeline.lastInstant().get() @@ -440,13 +447,8 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val updatedCnt9 = 30 - insertedCnt9 assertCDCOpCnt(cdcDataOnly9, insertedCnt9, updatedCnt9, 0) - // here cause we do the clean operation and just remain the commit6 and commit7, so we need to reset the total cnt. - // 70 is the number of inserted records at commit 6. - totalInsertedCnt = 80 + insertedCnt9 - totalUpdatedCnt = updatedCnt9 - totalDeletedCnt = 0 allVisibleCDCData = cdcDataFrame((commitTime1.toLong - 1).toString) - assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt, totalDeletedCnt) + assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt + 30, totalDeletedCnt) } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala index 5675ac4ebe9c6..672f3308765f2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala @@ -254,7 +254,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPath))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files @@ -311,7 +311,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPathWithUpdates))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 2 files @@ -369,7 +369,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPathWithUpserts))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files @@ -427,7 +427,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPath))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index e28b5bdec5927..51a8d26754a63 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -645,7 +645,7 @@ static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, S static void assertAtleastNCompactionCommits(int minExpected, String tablePath) { HoodieTableMetaClient meta = createMetaClient(storage, tablePath); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); int numCompactionCommits = timeline.countInstants(); assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); @@ -661,7 +661,7 @@ static void assertAtleastNDeltaCommits(int minExpected, String tablePath) { static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) { HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); int numCompactionCommits = timeline.countInstants(); assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index cb30d3dc0bee7..4da6ef51b627f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -875,7 +875,7 @@ public void testDeltaSyncWithPendingCompaction() throws Exception { // delete compaction commit HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient(storage, tableBasePath); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); HoodieInstant commitInstant = timeline.lastInstant().get(); String commitFileName = tableBasePath + "/.hoodie/" + commitInstant.getFileName(); fs.delete(new Path(commitFileName), false);