Skip to content

Commit

Permalink
[HUDI-6538] Refactor methods in TimelineDiffHelper class (#10938)
Browse files Browse the repository at this point in the history
  • Loading branch information
wombatu-kun authored Apr 1, 2024
1 parent 9b094e6 commit 44ab6f3
Showing 1 changed file with 21 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ public class TimelineDiffHelper {

private static final Logger LOG = LoggerFactory.getLogger(TimelineDiffHelper.class);

private TimelineDiffHelper() {
}

public static TimelineDiffResult getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline,
HoodieTimeline newTimeline) {
HoodieTimeline newTimeline) {

HoodieTimeline oldT = oldTimeline.filterCompletedAndCompactionInstants();
HoodieTimeline newT = newTimeline.filterCompletedAndCompactionInstants();
Expand All @@ -57,14 +60,14 @@ public static TimelineDiffResult getNewInstantsForIncrementalSync(HoodieTimeline
List<HoodieInstant> newInstants = new ArrayList<>();

// Check If any pending compaction is lost. If so, do not allow incremental timeline sync
List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = getPendingCompactionTransitions(oldT, newT);
List<Pair<HoodieInstant, HoodieInstant>> compactionInstants = getPendingActionTransitions(oldT.filterPendingCompactionTimeline(),
newT, HoodieTimeline.COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION);
List<HoodieInstant> lostPendingCompactions = compactionInstants.stream()
.filter(instantPair -> instantPair.getValue() == null).map(Pair::getKey).collect(Collectors.toList());
if (!lostPendingCompactions.isEmpty()) {
// If a compaction is unscheduled, fall back to complete refresh of fs view since some log files could have been
// moved. Its unsafe to incrementally sync in that case.
LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are :"
+ lostPendingCompactions);
LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are: {}", lostPendingCompactions);
return TimelineDiffResult.UNSAFE_SYNC_RESULT;
}
List<HoodieInstant> finishedCompactionInstants = compactionInstants.stream()
Expand All @@ -74,7 +77,8 @@ public static TimelineDiffResult getNewInstantsForIncrementalSync(HoodieTimeline

newTimeline.getInstantsAsStream().filter(instant -> !oldTimelineInstants.contains(instant)).forEach(newInstants::add);

List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = getPendingLogCompactionTransitions(oldTimeline, newTimeline);
List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants = getPendingActionTransitions(oldTimeline.filterPendingLogCompactionTimeline(),
newTimeline, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.LOG_COMPACTION_ACTION);
List<HoodieInstant> finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null || instantPair.getValue().isCompleted()))
Expand All @@ -87,52 +91,24 @@ public static TimelineDiffResult getNewInstantsForIncrementalSync(HoodieTimeline
}
}

/**
* Getting pending log compaction transitions.
*/
private static List<Pair<HoodieInstant, HoodieInstant>> getPendingLogCompactionTransitions(HoodieTimeline oldTimeline,
HoodieTimeline newTimeline) {
Set<HoodieInstant> newTimelineInstants = newTimeline.getInstantsAsStream().collect(Collectors.toSet());

return oldTimeline.filterPendingLogCompactionTimeline().getInstantsAsStream().map(instant -> {
if (newTimelineInstants.contains(instant)) {
return Pair.of(instant, instant);
} else {
HoodieInstant logCompacted =
new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp());
if (newTimelineInstants.contains(logCompacted)) {
return Pair.of(instant, logCompacted);
}
HoodieInstant inflightLogCompacted =
new HoodieInstant(State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, instant.getTimestamp());
if (newTimelineInstants.contains(inflightLogCompacted)) {
return Pair.of(instant, inflightLogCompacted);
}
return Pair.<HoodieInstant, HoodieInstant>of(instant, null);
}
}).collect(Collectors.toList());
}

/**
* Getting pending compaction transitions.
*/
private static List<Pair<HoodieInstant, HoodieInstant>> getPendingCompactionTransitions(HoodieTimeline oldTimeline,
HoodieTimeline newTimeline) {
private static List<Pair<HoodieInstant, HoodieInstant>> getPendingActionTransitions(HoodieTimeline pendingActionTimelineFromOld,
HoodieTimeline newTimeline,
String completedAction, String pendingAction) {
Set<HoodieInstant> newTimelineInstants = newTimeline.getInstantsAsStream().collect(Collectors.toSet());

return oldTimeline.filterPendingCompactionTimeline().getInstantsAsStream().map(instant -> {
return pendingActionTimelineFromOld.getInstantsAsStream().map(instant -> {
if (newTimelineInstants.contains(instant)) {
return Pair.of(instant, instant);
} else {
HoodieInstant compacted =
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, instant.getTimestamp());
if (newTimelineInstants.contains(compacted)) {
return Pair.of(instant, compacted);
HoodieInstant completedInstant =
new HoodieInstant(State.COMPLETED, completedAction, instant.getTimestamp());
if (newTimelineInstants.contains(completedInstant)) {
return Pair.of(instant, completedInstant);
}
HoodieInstant inflightCompacted =
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, instant.getTimestamp());
if (newTimelineInstants.contains(inflightCompacted)) {
return Pair.of(instant, inflightCompacted);
HoodieInstant inflightInstant =
new HoodieInstant(State.INFLIGHT, pendingAction, instant.getTimestamp());
if (newTimelineInstants.contains(inflightInstant)) {
return Pair.of(instant, inflightInstant);
}
return Pair.<HoodieInstant, HoodieInstant>of(instant, null);
}
Expand Down

0 comments on commit 44ab6f3

Please sign in to comment.