Skip to content

Commit

Permalink
[HUDI-7557] Fix incremental cleaner when commit for savepoint removed (
Browse files Browse the repository at this point in the history
  • Loading branch information
codope authored and yihua committed May 14, 2024
1 parent 58b0d24 commit 2adac11
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ private List<String> getPartitionsFromDeletedSavepoint(HoodieCleanMetadata clean
Option<HoodieInstant> instantOption = hoodieTable.getCompletedCommitsTimeline().filter(instant -> instant.getTimestamp().equals(savepointCommit)).firstInstant();
if (!instantOption.isPresent()) {
LOG.warn("Skipping to process a commit for which savepoint was removed as the instant moved to archived timeline already");
return Stream.empty();
}
HoodieInstant instant = instantOption.get();
return getPartitionsForInstants(instant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ void testGetDeletePaths(HoodieWriteConfig config, String earliestInstant, List<H
void testPartitionsForIncrCleaning(HoodieWriteConfig config, String earliestInstant,
String lastCompletedTimeInLastClean, String lastCleanInstant, String earliestInstantsInLastClean, List<String> partitionsInLastClean,
Map<String, List<String>> savepointsTrackedInLastClean, Map<String, List<String>> activeInstantsPartitions,
Map<String, List<String>> savepoints, List<String> expectedPartitions) throws IOException {
Map<String, List<String>> savepoints, List<String> expectedPartitions, boolean areCommitsForSavepointsRemoved) throws IOException {
HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline);
// setup savepoint mocks
Set<String> savepointTimestamps = savepoints.keySet().stream().collect(Collectors.toSet());
when(mockHoodieTable.getSavepointTimestamps()).thenReturn(savepointTimestamps);
if (!savepoints.isEmpty()) {
for (Map.Entry<String, List<String>> entry: savepoints.entrySet()) {
for (Map.Entry<String, List<String>> entry : savepoints.entrySet()) {
Pair<HoodieSavepointMetadata, Option<byte[]>> savepointMetadataOptionPair = getSavepointMetadata(entry.getValue());
HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, entry.getKey());
when(activeTimeline.getInstantDetails(instant)).thenReturn(savepointMetadataOptionPair.getRight());
Expand All @@ -157,7 +157,7 @@ void testPartitionsForIncrCleaning(HoodieWriteConfig config, String earliestInst
Pair<HoodieCleanMetadata, Option<byte[]>> cleanMetadataOptionPair =
getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant, earliestInstantsInLastClean, lastCompletedTimeInLastClean, savepointsTrackedInLastClean.keySet());
mockLastCleanCommit(mockHoodieTable, lastCleanInstant, earliestInstantsInLastClean, activeTimeline, cleanMetadataOptionPair);
mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, savepointsTrackedInLastClean);
mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, savepointsTrackedInLastClean, areCommitsForSavepointsRemoved);

// Trigger clean and validate partitions to clean.
CleanPlanner<?, ?, ?, ?> cleanPlanner = new CleanPlanner<>(context, mockHoodieTable, config);
Expand Down Expand Up @@ -333,17 +333,17 @@ static Stream<Arguments> keepLatestByHoursOrCommitsArgs() {

static Stream<Arguments> keepLatestByHoursOrCommitsArgsIncrCleanPartitions() {
String earliestInstant = "20231204194919610";
String earliestInstantPlusTwoDays = "20231206194919610";
String earliestInstantPlusTwoDays = "20231206194919610";
String lastCleanInstant = earliestInstantPlusTwoDays;
String earliestInstantMinusThreeDays = "20231201194919610";
String earliestInstantMinusFourDays = "20231130194919610";
String earliestInstantMinusFiveDays = "20231129194919610";
String earliestInstantMinusSixDays = "20231128194919610";
String earliestInstantInLastClean = earliestInstantMinusSixDays;
String lastCompletedInLastClean = earliestInstantMinusSixDays;
String earliestInstantMinusOneWeek = "20231127194919610";
String earliestInstantMinusOneWeek = "20231127194919610";
String savepoint2 = earliestInstantMinusOneWeek;
String earliestInstantMinusOneMonth = "20231104194919610";
String earliestInstantMinusOneMonth = "20231104194919610";
String savepoint3 = earliestInstantMinusOneMonth;

List<String> threePartitionsInActiveTimeline = Arrays.asList(PARTITION1, PARTITION2, PARTITION3);
Expand All @@ -361,66 +361,74 @@ static Stream<Arguments> keepLatestByHoursOrCommitsArgsIncrCleanPartitions() {
List<Arguments> arguments = new ArrayList<>();

// no savepoints tracked in last clean and no additional savepoints. all partitions in uncleaned instants should be expected
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(
earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), Collections.emptyMap(),
activeInstantsPartitionsMap3, Collections.emptyMap(), threePartitionsInActiveTimeline));
activeInstantsPartitionsMap3, Collections.emptyMap(), threePartitionsInActiveTimeline, false));

// a new savepoint is added after last clean. but rest of uncleaned touches all partitions, and so all partitions are expected
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(
earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1), Collections.emptyMap(),
activeInstantsPartitionsMap3, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), threePartitionsInActiveTimeline));
activeInstantsPartitionsMap3, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), threePartitionsInActiveTimeline, false));

// previous clean tracks a savepoint which exists in timeline still. only 2 partitions are touched by uncleaned instants. only 2 partitions are expected
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(
earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)),
activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline));
activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline, false));

// savepoint tracked in previous clean was removed(touching partition1). latest uncleaned touched 2 other partitions. So, in total 3 partitions are expected.
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(
earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)),
activeInstantsPartitionsMap2, Collections.emptyMap(), threePartitionsInActiveTimeline));
activeInstantsPartitionsMap2, Collections.emptyMap(), threePartitionsInActiveTimeline, false));

// previous savepoint still exists and touches partition1. uncleaned touches only partition2 and partition3. expected partition2 and partition3.
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(
earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)),
activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline));
activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline, false));

// a new savepoint was added compared to previous clean. all 2 partitions are expected since uncleaned commits touched just 2 partitions.
Map<String, List<String>> latestSavepoints = new HashMap<>();
latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
latestSavepoints.put(savepoint3, Collections.singletonList(PARTITION1));
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(
earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)),
activeInstantsPartitionsMap2, latestSavepoints, twoPartitionsInActiveTimeline));
activeInstantsPartitionsMap2, latestSavepoints, twoPartitionsInActiveTimeline, false));

// 2 savepoints were tracked in previous clean. one of them is removed in latest. A partition which was part of the removed savepoint should be added in final
// list of partitions to clean
Map<String, List<String>> previousSavepoints = new HashMap<>();
latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
latestSavepoints.put(savepoint3, Collections.singletonList(PARTITION2));
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(
earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1),
previousSavepoints, activeInstantsPartitionsMap2, Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), twoPartitionsInActiveTimeline));
previousSavepoints, activeInstantsPartitionsMap2, Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), twoPartitionsInActiveTimeline, false));

// 2 savepoints were tracked in previous clean. one of them is removed in latest. But a partition part of removed savepoint is already touched by uncleaned commits.
// so we expect all 3 partitions to be in final list.
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(
earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1),
previousSavepoints, activeInstantsPartitionsMap3, Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), threePartitionsInActiveTimeline));
previousSavepoints, activeInstantsPartitionsMap3, Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), threePartitionsInActiveTimeline, false));

// unpartitioned test case. savepoint removed.
List<String> unPartitionsInActiveTimeline = Arrays.asList(StringUtils.EMPTY_STRING);
Map<String, List<String>> activeInstantsUnPartitionsMap = new HashMap<>();
activeInstantsUnPartitionsMap.put(earliestInstantMinusThreeDays, unPartitionsInActiveTimeline);

arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(
earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(StringUtils.EMPTY_STRING),
Collections.singletonMap(savepoint2, Collections.singletonList(StringUtils.EMPTY_STRING)),
activeInstantsUnPartitionsMap, Collections.emptyMap(), unPartitionsInActiveTimeline));
activeInstantsUnPartitionsMap, Collections.emptyMap(), unPartitionsInActiveTimeline, false));

// savepoint tracked in previous clean was removed(touching partition1). active instants does not have the instant corresponding to the savepoint.
// latest uncleaned touched 2 other partitions. So, in total 2 partitions are expected.
activeInstantsPartitionsMap2.remove(earliestInstantMinusOneWeek);
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(
earliestInstant, lastCompletedInLastClean, lastCleanInstant, earliestInstantInLastClean, Collections.singletonList(PARTITION1),
Collections.singletonMap(savepoint2, Collections.singletonList(PARTITION1)),
activeInstantsPartitionsMap2, Collections.emptyMap(), twoPartitionsInActiveTimeline, true));

return arguments.stream();
}
Expand Down Expand Up @@ -451,19 +459,20 @@ private static List<Arguments> buildArgumentsForCleanByHoursAndCommitsCases(Stri
}

// helper to build common cases for the two policies
private static List<Arguments> buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(String earliestInstant,
String latestCompletedInLastClean,
String lastKnownCleanInstantTime,
String earliestInstantInLastClean,
List<String> partitionsInLastClean,
Map<String, List<String>> savepointsTrackedInLastClean,
Map<String, List<String>> activeInstantsToPartitionsMap,
Map<String, List<String>> savepoints,
List<String> expectedPartitions) {
private static List<Arguments> buildArgumentsForCleanByHoursAndCommitsIncrCleanPartitionsCases(String earliestInstant,
String latestCompletedInLastClean,
String lastKnownCleanInstantTime,
String earliestInstantInLastClean,
List<String> partitionsInLastClean,
Map<String, List<String>> savepointsTrackedInLastClean,
Map<String, List<String>> activeInstantsToPartitionsMap,
Map<String, List<String>> savepoints,
List<String> expectedPartitions,
boolean areCommitsForSavepointsRemoved) {
return Arrays.asList(Arguments.of(getCleanByHoursConfig(), earliestInstant, latestCompletedInLastClean, lastKnownCleanInstantTime,
earliestInstantInLastClean, partitionsInLastClean, savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, expectedPartitions),
earliestInstantInLastClean, partitionsInLastClean, savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, expectedPartitions, areCommitsForSavepointsRemoved),
Arguments.of(getCleanByCommitsConfig(), earliestInstant, latestCompletedInLastClean, lastKnownCleanInstantTime,
earliestInstantInLastClean, partitionsInLastClean, savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, expectedPartitions));
earliestInstantInLastClean, partitionsInLastClean, savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, expectedPartitions, areCommitsForSavepointsRemoved));
}

private static HoodieFileGroup buildFileGroup(List<String> baseFileCommitTimes) {
Expand Down Expand Up @@ -508,7 +517,7 @@ private static Pair<HoodieCleanMetadata, Option<byte[]>> getCleanCommitMetadata(
extraMetadata.put(SAVEPOINTED_TIMESTAMPS, savepointsToTrack.stream().collect(Collectors.joining(",")));
}
HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime, 100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, extraMetadata.isEmpty() ? null : extraMetadata);
CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, extraMetadata.isEmpty() ? null : extraMetadata);
return Pair.of(cleanMetadata, TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
} catch (IOException ex) {
throw new UncheckedIOException(ex);
Expand Down Expand Up @@ -549,14 +558,16 @@ private static void mockLastCleanCommit(HoodieTable hoodieTable, String timestam
}

private static void mockFewActiveInstants(HoodieTable hoodieTable, Map<String, List<String>> activeInstantsToPartitions,
Map<String, List<String>> savepointedCommitsToAdd)
Map<String, List<String>> savepointedCommitsToAdd, boolean areCommitsForSavepointsRemoved)
throws IOException {
HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline();
List<HoodieInstant> instants = new ArrayList<>();
Map<String, List<String>> instantstoProcess = new HashMap<>();
instantstoProcess.putAll(activeInstantsToPartitions);
instantstoProcess.putAll(savepointedCommitsToAdd);
instantstoProcess.forEach((k,v) -> {
if (!areCommitsForSavepointsRemoved) {
instantstoProcess.putAll(savepointedCommitsToAdd);
}
instantstoProcess.forEach((k, v) -> {
HoodieInstant hoodieInstant = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, k);
instants.add(hoodieInstant);
Map<String, List<HoodieWriteStat>> partitionToWriteStats = new HashMap<>();
Expand Down

0 comments on commit 2adac11

Please sign in to comment.