Skip to content

Commit

Permalink
[HUDI-7572] Avoid to schedule empty compaction plan without log files (
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 authored and yihua committed May 14, 2024
1 parent 9dad0ce commit 140e483
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException {
LOG.info("Total number of file slices " + totalFileSlices.value());

if (operations.isEmpty()) {
LOG.warn("No operations are retrieved for " + metaClient.getBasePath());
LOG.warn("No operations are retrieved for {}", metaClient.getBasePathV2());
return null;
}

if (totalLogFiles.value() <= 0) {
LOG.warn("No log files are retrieved for {}", metaClient.getBasePathV2());
return null;
}

Expand All @@ -149,7 +154,7 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException {
+ "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
+ ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) {
LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
LOG.warn("After filtering, Nothing to compact for {}", metaClient.getBasePathV2());
}
return compactionPlan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand All @@ -53,6 +54,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -78,6 +80,7 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
.hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).orcMaxFileSize(1024 * 1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(Collections.singletonMap(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name()))
.withEmbeddedTimelineServerEnabled(true);
}

Expand Down Expand Up @@ -163,6 +166,21 @@ protected void scheduleCompaction(String compactionInstantTime, SparkRDDWriteCli
assertEquals(compactionInstantTime, instant.getTimestamp(), "Last compaction instant must be the one set");
}

/**
* Tries to schedule a compaction plan and returns the latest pending compaction instant time.
*
* @param compactionInstantTime The given compaction instant time
* @param client The write client
* @param cfg The write config
*
* @return The latest pending instant time.
*/
protected String tryScheduleCompaction(String compactionInstantTime, SparkRDDWriteClient client, HoodieWriteConfig cfg) {
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
return metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
}

protected void scheduleAndExecuteCompaction(String compactionInstantTime, SparkRDDWriteClient client, HoodieTable table,
HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
scheduleCompaction(compactionInstantTime, client, cfg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
Expand All @@ -44,6 +46,8 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -194,7 +198,7 @@ public void testInflightCompaction() throws Exception {

@Test
public void testScheduleIngestionBeforePendingCompaction() throws Exception {
// Case: Failure case. Latest pending compaction instant time must be earlier than this instant time
// Case: Non-serial case. Latest pending compaction instant time can be earlier than this instant time
HoodieWriteConfig cfg = getConfig(false);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
Expand All @@ -210,16 +214,17 @@ public void testScheduleIngestionBeforePendingCompaction() throws Exception {
new ArrayList<>());

// Schedule compaction but do not run them
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
scheduleCompaction(compactionInstantTime, client, cfg);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has expected instant time");

assertThrows(IllegalArgumentException.class, () -> {
runNextDeltaCommits(client, readClient, Arrays.asList(failedInstantTime), records, cfg, false,
Arrays.asList(compactionInstantTime));
}, "Latest pending compaction instant time must be earlier than this instant time");
assertDoesNotThrow(() -> {
runNextDeltaCommits(client, readClient, Collections.singletonList(failedInstantTime), records, cfg, false,
Collections.singletonList(compactionInstantTime));
}, "Latest pending compaction instant time can be earlier than this instant time");
}

@Test
Expand Down Expand Up @@ -272,23 +277,15 @@ public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception {
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());

assertThrows(IllegalArgumentException.class, () -> {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
}, "Compaction Instant to be scheduled cannot have older timestamp");
// Schedule compaction but do not run them
assertNull(tryScheduleCompaction(compactionInstantTime, client, cfg), "Compaction Instant can be scheduled with older timestamp");

// Schedule with timestamp same as that of committed instant
assertThrows(IllegalArgumentException.class, () -> {
// Schedule compaction but do not run them
scheduleCompaction(secondInstantTime, client, cfg);
}, "Compaction Instant to be scheduled cannot have same timestamp as committed instant");

final String compactionInstantTime2 = "006";
scheduleCompaction(compactionInstantTime2, client, cfg);
assertThrows(IllegalArgumentException.class, () -> {
// Schedule compaction with the same times as a pending compaction
scheduleCompaction(secondInstantTime, client, cfg);
}, "Compaction Instant to be scheduled cannot have same timestamp as a pending compaction");
assertNull(tryScheduleCompaction(secondInstantTime, client, cfg), "Compaction Instant to be scheduled can have same timestamp as committed instant");

final String compactionInstantTime2 = HoodieActiveTimeline.createNewInstantTime();
// Schedule compaction but do not run them
assertNotNull(tryScheduleCompaction(compactionInstantTime2, client, cfg), "Compaction Instant can be scheduled with greater timestamp");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits, int
.build();
}

private HoodieWriteConfig getConfigDisableComapction(int maxDeltaCommits, int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
private HoodieWriteConfig getConfigDisableCompaction(int maxDeltaCommits, int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
return getConfigBuilder(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
Expand Down Expand Up @@ -111,7 +111,7 @@ public void testSuccessfulCompactionBasedOnNumCommits() throws Exception {
@Test
public void testSuccessfulCompactionBasedOnNumAfterCompactionRequest() throws Exception {
// Given: make 4 commits
HoodieWriteConfig cfg = getConfigDisableComapction(4, 60, CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
HoodieWriteConfig cfg = getConfigDisableCompaction(4, 60, CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
// turn off compaction table service to mock compaction service is down or very slow
List<String> instants = IntStream.range(0, 4).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());

Expand Down

0 comments on commit 140e483

Please sign in to comment.