From cbcbc5182f524886946fdefec86faf75110f35c5 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Mon, 1 Apr 2024 11:38:29 +0530 Subject: [PATCH] [HUDI-7557] Fix incremental cleaner when commit for savepoint removed --- .../hudi/table/action/clean/CleanPlanner.java | 1 + .../hudi/table/action/TestCleanPlanner.java | 89 +++++++++++-------- 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 48ec8f9baa1e..753f8c8253d5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -245,6 +245,7 @@ private List getPartitionsFromDeletedSavepoint(HoodieCleanMetadata clean Option instantOption = hoodieTable.getCompletedCommitsTimeline().filter(instant -> instant.getTimestamp().equals(savepointCommit)).firstInstant(); if (!instantOption.isPresent()) { LOG.warn("Skipping to process a commit for which savepoint was removed as the instant moved to archived timeline already"); + return Stream.empty(); } HoodieInstant instant = instantOption.get(); return getPartitionsForInstants(instant); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java index 8052572fcea9..9989273b723f 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java @@ -138,14 +138,14 @@ void testGetDeletePaths(HoodieWriteConfig config, String earliestInstant, List partitionsInLastClean, Map> savepointsTrackedInLastClean, Map> activeInstantsPartitions, - Map> savepoints, List expectedPartitions) throws IOException { + Map> savepoints, List expectedPartitions, boolean areCommitsForSavepointsRemoved) throws IOException { HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class); when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline); // setup savepoint mocks Set savepointTimestamps = savepoints.keySet().stream().collect(Collectors.toSet()); when(mockHoodieTable.getSavepointTimestamps()).thenReturn(savepointTimestamps); if (!savepoints.isEmpty()) { - for (Map.Entry> entry: savepoints.entrySet()) { + for (Map.Entry> entry : savepoints.entrySet()) { Pair> savepointMetadataOptionPair = getSavepointMetadata(entry.getValue()); HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, entry.getKey()); when(activeTimeline.getInstantDetails(instant)).thenReturn(savepointMetadataOptionPair.getRight()); @@ -156,7 +156,7 @@ void testPartitionsForIncrCleaning(HoodieWriteConfig config, String earliestInst Pair> cleanMetadataOptionPair = getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant, earliestInstantsInLastClean, lastCompletedTimeInLastClean, savepointsTrackedInLastClean.keySet()); mockLastCleanCommit(mockHoodieTable, lastCleanInstant, earliestInstantsInLastClean, activeTimeline, cleanMetadataOptionPair); - mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, savepointsTrackedInLastClean); + mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, savepointsTrackedInLastClean, areCommitsForSavepointsRemoved); // Trigger clean and validate partitions to clean. CleanPlanner cleanPlanner = new CleanPlanner<>(context, mockHoodieTable, config); @@ -332,7 +332,7 @@ static Stream keepLatestByHoursOrCommitsArgs() { static Stream keepLatestByHoursOrCommitsArgsIncrCleanPartitions() { String earliestInstant = "20231204194919610"; - String earliestInstantPlusTwoDays = "20231206194919610"; + String earliestInstantPlusTwoDays = "20231206194919610"; String lastCleanInstant = earliestInstantPlusTwoDays; String earliestInstantMinusThreeDays = "20231201194919610"; String earliestInstantMinusFourDays = "20231130194919610"; @@ -340,9 +340,9 @@ static Stream keepLatestByHoursOrCommitsArgsIncrCleanPartitions() { String earliestInstantMinusSixDays = "20231128194919610"; String earliestInstantInLastClean = earliestInstantMinusSixDays; String lastCompletedInLastClean = earliestInstantMinusSixDays; - String earliestInstantMinusOneWeek = "20231127194919610"; + String earliestInstantMinusOneWeek = "20231127194919610"; String savepoint2 = earliestInstantMinusOneWeek; - String earliestInstantMinusOneMonth = "20231104194919610"; + String earliestInstantMinusOneMonth = "20231104194919610"; String savepoint3 = earliestInstantMinusOneMonth; List threePartitionsInActiveTimeline = Arrays.asList(PARTITION1, PARTITION2, PARTITION3); @@ -360,66 +360,74 @@ static Stream keepLatestByHoursOrCommitsArgsIncrCleanPartitions() { List arguments = new ArrayList<>(); // no savepoints tracked in last clean and no additional savepoints. all partitions in uncleaned instants should be expected - arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases( + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases( earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), Collections.emptyMap(), - activeInstantsPartitionsMap3, Collections.emptyMap(), threePartitionsInActiveTimeline)); + activeInstantsPartitionsMap3, Collections.emptyMap(), threePartitionsInActiveTimeline, false)); // a new savepoint is added after last clean. but rest of uncleaned touches all partitions, and so all partitions are expected - arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases( + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases( earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), Collections.emptyMap(), - activeInstantsPartitionsMap3, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), threePartitionsInActiveTimeline)); + activeInstantsPartitionsMap3, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), threePartitionsInActiveTimeline, false)); // previous clean tracks a savepoint which exists in timeline still. only 2 partitions are touched by uncleaned instants. only 2 partitions are expected - arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases( + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases( earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), - activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline)); + activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline, false)); // savepoint tracked in previous clean was removed(touching partition1). latest uncleaned touched 2 other partitions. So, in total 3 partitions are expected. - arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases( + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases( earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), - activeInstantsPartitionsMap2, Collections.emptyMap(), threePartitionsInActiveTimeline)); + activeInstantsPartitionsMap2, Collections.emptyMap(), threePartitionsInActiveTimeline, false)); // previous savepoint still exists and touches partition1. uncleaned touches only partition2 and partition3. expected partition2 and partition3. - arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases( + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases( earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), - activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline)); + activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline, false)); // a new savepoint was added compared to previous clean. all 2 partitions are expected since uncleaned commits touched just 2 partitions. Map> latestSavepoints = new HashMap<>(); latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1)); latestSavepoints.put(savepoint3, Collections.singletonList(PARTITION1)); - arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases( + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases( earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), - activeInstantsPartitionsMap2, latestSavepoints, twoPartitionsInActiveTimeline)); + activeInstantsPartitionsMap2, latestSavepoints, twoPartitionsInActiveTimeline, false)); // 2 savepoints were tracked in previous clean. one of them is removed in latest. A partition which was part of the removed savepoint should be added in final // list of partitions to clean Map> previousSavepoints = new HashMap<>(); latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1)); latestSavepoints.put(savepoint3, Collections.singletonList(PARTITION2)); - arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases( + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases( earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), - previousSavepoints, activeInstantsPartitionsMap2, Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), twoPartitionsInActiveTimeline)); + previousSavepoints, activeInstantsPartitionsMap2, Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), twoPartitionsInActiveTimeline, false)); // 2 savepoints were tracked in previous clean. one of them is removed in latest. But a partition part of removed savepoint is already touched by uncleaned commits. // so we expect all 3 partitions to be in final list. - arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases( + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases( earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), - previousSavepoints, activeInstantsPartitionsMap3, Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), threePartitionsInActiveTimeline)); + previousSavepoints, activeInstantsPartitionsMap3, Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), threePartitionsInActiveTimeline, false)); // unpartitioned test case. savepoint removed. List unPartitionsInActiveTimeline = Arrays.asList(StringUtils.EMPTY_STRING); Map> activeInstantsUnPartitionsMap = new HashMap<>(); activeInstantsUnPartitionsMap.put(earliestInstantMinusThreeDays, unPartitionsInActiveTimeline); - arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases( + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases( earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(StringUtils.EMPTY_STRING), Collections.singletonMap(savepoint2, Collections.singletonList(StringUtils.EMPTY_STRING)), - activeInstantsUnPartitionsMap, Collections.emptyMap(), unPartitionsInActiveTimeline)); + activeInstantsUnPartitionsMap, Collections.emptyMap(), unPartitionsInActiveTimeline, false)); + + // savepoint tracked in previous clean was removed(touching partition1). active instants does not have the instant corresponding to the savepoint. + // latest uncleaned touched 2 other partitions. So, in total 2 partitions are expected. + activeInstantsPartitionsMap2.remove(earliestInstantMinusOneWeek); + arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases( + earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), + Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), + activeInstantsPartitionsMap2, Collections.emptyMap(), twoPartitionsInActiveTimeline, true)); return arguments.stream(); } @@ -450,19 +458,20 @@ private static List buildArgumentsForCleanByHoursAndCommitsCases(Stri } // helper to build common cases for the two policies - private static List buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(String earliestInstant, - String latestCompletedInLastClean, - String lastKnownCleanInstantTime, - String earliestInstantInLastClean, - List partitionsInLastClean, - Map> savepointsTrackedInLastClean, - Map> activeInstantsToPartitionsMap, - Map> savepoints, - List expectedPartitions) { + private static List buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(String earliestInstant, + String latestCompletedInLastClean, + String lastKnownCleanInstantTime, + String earliestInstantInLastClean, + List partitionsInLastClean, + Map> savepointsTrackedInLastClean, + Map> activeInstantsToPartitionsMap, + Map> savepoints, + List expectedPartitions, + boolean areCommitsForSavepointsRemoved) { return Arrays.asList(Arguments.of(getCleanByHoursConfig(), earliestInstant, latestCompletedInLastClean, lastKnownCleanInstantTime, - earliestInstantInLastClean, partitionsInLastClean, savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, expectedPartitions), + earliestInstantInLastClean, partitionsInLastClean, savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, expectedPartitions, areCommitsForSavepointsRemoved), Arguments.of(getCleanByCommitsConfig(), earliestInstant, latestCompletedInLastClean, lastKnownCleanInstantTime, - earliestInstantInLastClean, partitionsInLastClean, savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, expectedPartitions)); + earliestInstantInLastClean, partitionsInLastClean, savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, expectedPartitions, areCommitsForSavepointsRemoved)); } private static HoodieFileGroup buildFileGroup(List baseFileCommitTimes) { @@ -507,7 +516,7 @@ private static Pair> getCleanCommitMetadata( extraMetadata.put(SAVEPOINTED_TIMESTAMPS, savepointsToTrack.stream().collect(Collectors.joining(","))); } HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime, 100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata, - CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, extraMetadata.isEmpty() ? null : extraMetadata); + CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, extraMetadata.isEmpty() ? null : extraMetadata); return Pair.of(cleanMetadata, TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata)); } catch (IOException ex) { throw new UncheckedIOException(ex); @@ -548,14 +557,16 @@ private static void mockLastCleanCommit(HoodieTable hoodieTable, String timestam } private static void mockFewActiveInstants(HoodieTable hoodieTable, Map> activeInstantsToPartitions, - Map> savepointedCommitsToAdd) + Map> savepointedCommitsToAdd, boolean areCommitsForSavepointsRemoved) throws IOException { HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline(); List instants = new ArrayList<>(); Map> instantstoProcess = new HashMap<>(); instantstoProcess.putAll(activeInstantsToPartitions); - instantstoProcess.putAll(savepointedCommitsToAdd); - instantstoProcess.forEach((k,v) -> { + if (!areCommitsForSavepointsRemoved) { + instantstoProcess.putAll(savepointedCommitsToAdd); + } + instantstoProcess.forEach((k, v) -> { HoodieInstant hoodieInstant = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, k); instants.add(hoodieInstant); Map> partitionToWriteStats = new HashMap<>();