Skip to content

Commit

Permalink
[HUDI-7510] Loosen the compaction scheduling and rollback check for M…
Browse files Browse the repository at this point in the history
…DT (#10874)
  • Loading branch information
danny0405 authored Apr 1, 2024
1 parent 26c00a3 commit 9b094e6
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1104,21 +1104,13 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime)
HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue();
HoodieTimeline deltacommitsSinceCompaction = deltaCommitsInfo.get().getKey();

// The deltacommit that will be rolled back
HoodieInstant deltaCommitInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);

validateRollback(commitToRollbackInstantTime, compactionInstant, deltacommitsSinceCompaction);

// lets apply a delta commit with DT's rb instant(with special suffix) containing following records:
// a. any log files as part of RB commit metadata that was added
// b. log files added by the commit in DT being rolled back. By rolled back, we mean, a rollback block will be added and does not mean it will be deleted.
// both above list should only be added to FILES partition.

String rollbackInstantTime = createRollbackTimestamp(instantTime);
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, dataMetaClient, rollbackMetadata, instantTime));

// The deltacommit that will be rolled back
HoodieInstant deltaCommitInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
LOG.info("Rolling back MDT deltacommit " + commitToRollbackInstantTime);
String rollbackInstantTime = createRollbackTimestamp(instantTime);
if (!getWriteClient().rollback(commitToRollbackInstantTime, rollbackInstantTime)) {
throw new HoodieMetadataException("Failed to rollback deltacommit at " + commitToRollbackInstantTime);
}
Expand All @@ -1134,8 +1126,9 @@ protected void validateRollback(
String commitToRollbackInstantTime,
HoodieInstant compactionInstant,
HoodieTimeline deltacommitsSinceCompaction) {
// The commit being rolled back should not be earlier than the latest compaction on the MDT. Compaction on MDT only occurs when all actions
// are completed on the dataset. Hence, this case implies a rollback of completed commit which should actually be handled using restore.
// The commit being rolled back should not be earlier than the latest compaction on the MDT because the latest file slice does not change after all.
// Compaction on MDT only occurs when all actions are completed on the dataset.
// Hence, this case implies a rollback of completed commit which should actually be handled using restore.
if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
final String compactionInstantTime = compactionInstant.getTimestamp();
if (commitToRollbackInstantTime.length() == compactionInstantTime.length() && HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, compactionInstantTime)) {
Expand Down Expand Up @@ -1319,9 +1312,8 @@ public void performTableServices(Option<String> inFlightInstantTimestamp) {
.getTimestamp();
LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + ", running clean operations.");
cleanIfNecessary(writeClient, latestDeltacommitTime);

// Do timeline validation before scheduling compaction/logCompaction operations.
if (validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, latestDeltacommitTime)) {
if (validateCompactionScheduling()) {
compactIfNecessary(writeClient, latestDeltacommitTime);
}
writeClient.archive();
Expand Down Expand Up @@ -1358,10 +1350,12 @@ private void runPendingTableServicesOperations(BaseHoodieWriteClient writeClient
* deltacommit.
*/
protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String latestDeltacommitTime) {
// Trigger compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
final String compactionInstantTime = writeClient.createNewInstantTime(false);
// IMPORTANT: Trigger compaction with max instant time that is smaller than(or equals) the earliest pending instant from DT.
// The compaction planner will manage to filter out the log files that finished with greater completion time.
// see BaseHoodieCompactionPlanGenerator.generateCompactionPlan for more details.
final String compactionInstantTime = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBeforeOrEquals(latestDeltacommitTime).firstInstant().map(HoodieInstant::getTimestamp)
.orElse(writeClient.createNewInstantTime(false));

// we need to avoid checking compaction w/ same instant again.
// let's say we trigger compaction after C5 in MDT and so compaction completes with C4001. but C5 crashed before completing in MDT.
Expand Down Expand Up @@ -1410,35 +1404,19 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan
/**
* Validates the timeline for both main and metadata tables to ensure compaction on MDT can be scheduled.
*/
protected boolean validateTimelineBeforeSchedulingCompaction(Option<String> inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
// we need to find if there are any inflights in data table timeline before or equal to the latest delta commit in metadata table.
// Whenever you want to change this logic, please ensure all below scenarios are considered.
// a. There could be a chance that latest delta commit in MDT is committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
// b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, the latest compaction instant time in MDT represents
// any instants before that is already synced with metadata table.
// c. Do consider out of order commits. For eg, c4 from DT could complete before c3. and we can't trigger compaction in MDT with c4 as base instant time, until every
// instant before c4 is synced with metadata table.
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();

if (!pendingInstants.isEmpty()) {
checkNumDeltaCommits(metadataMetaClient, dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
LOG.info(String.format(
"Cannot compact metadata table as there are %d inflight instants in data table before latest deltacommit in metadata table: %s. Inflight instants in data table: %s",
pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, Arrays.toString(pendingInstants.toArray())));
return false;
}

// Check if there are any pending compaction or log compaction instants in the timeline.
// If pending compact/logCompaction operations are found abort scheduling new compaction/logCompaction operations.
Option<HoodieInstant> pendingLogCompactionInstant =
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
Option<HoodieInstant> pendingCompactionInstant =
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) {
LOG.warn(String.format("Not scheduling compaction or logCompaction, since a pending compaction instant %s or logCompaction %s instant is present",
pendingCompactionInstant, pendingLogCompactionInstant));
return false;
protected boolean validateCompactionScheduling() {
// Under the log compaction scope, the sequence of the log-compaction and compaction needs to be ensured because metadata items such as RLI
// only has proc-time ordering semantics. For "ensured", it means the completion sequence of the log-compaction/compaction is the same as the start sequence.
if (metadataWriteConfig.isLogCompactionEnabled()) {
Option<HoodieInstant> pendingLogCompactionInstant =
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
Option<HoodieInstant> pendingCompactionInstant =
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) {
LOG.warn(String.format("Not scheduling compaction or logCompaction, since a pending compaction instant %s or logCompaction %s instant is present",
pendingCompactionInstant, pendingLogCompactionInstant));
return false;
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ public HoodieTestTable addRollback(String instantTime, HoodieRollbackMetadata ro
if (writer != null) {
writer.update(rollbackMetadata, instantTime);
}
super.addRollbackCompleted(instantTime, rollbackMetadata, false);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -172,24 +171,6 @@ protected void commitInternal(String instantTime, Map<MetadataPartitionType, Hoo
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata, dataMetaClient.getTableConfig().getMetadataPartitions()));
}

/**
* Validates the timeline for both main and metadata tables to ensure compaction on MDT can be scheduled.
*/
@Override
protected boolean validateTimelineBeforeSchedulingCompaction(Option<String> inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
// Allows compaction of the metadata table to run regardless of inflight instants
return true;
}

@Override
protected void validateRollback(String commitToRollbackInstantTime, HoodieInstant compactionInstant, HoodieTimeline deltacommitsSinceCompaction) {
// ignore, flink has more radical compression strategy, it is very probably that
// the latest compaction instant has greater timestamp than the instant to roll back.

// The limitation can be relaxed because the log reader of MDT only accepts valid instants
// based on the DT timeline, so the base file of MDT does not include un-committed instants.
}

@Override
public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) {
throw new HoodieNotSupportedException("Dropping metadata index not supported for Flink metadata table yet.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
Expand All @@ -69,6 +70,7 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -110,7 +112,6 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Time;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -167,7 +168,6 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -578,8 +578,8 @@ public void testMetadataTableCompactionWithPendingInstants() throws Exception {
doWriteOperation(testTable, "0000007", INSERT);

tableMetadata = metadata(writeConfig, context);
// verify that compaction of metadata table does not kick in.
assertFalse(tableMetadata.getLatestCompactionTime().isPresent());
// verify that compaction of metadata table should kick in.
assertTrue(tableMetadata.getLatestCompactionTime().isPresent(), "Compaction of metadata table does not kick in");

// move inflight to completed
testTable.moveInflightCommitToComplete("0000003", inflightCommitMeta);
Expand Down Expand Up @@ -1051,7 +1051,6 @@ public void testRollbackOperationsNonPartitioned() throws Exception {
*/
@Test
public void testManualRollbacks() throws Exception {
boolean populateMateFields = false;
init(COPY_ON_WRITE, false);
// Setting to archive more aggressively on the Metadata Table than the Dataset
final int maxDeltaCommitsBeforeCompaction = 4;
Expand Down Expand Up @@ -1082,23 +1081,17 @@ public void testManualRollbacks() throws Exception {
}
validateMetadata(testTable);

// We can only rollback those commits whose deltacommit have not been archived yet.
int numRollbacks = 0;
boolean exceptionRaised = false;
// We can only roll back those commits whose deltacommit have not been archived yet.
List<HoodieInstant> allInstants = metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants().collect(Collectors.toList());
for (HoodieInstant instantToRollback : allInstants) {
try {
testTable.doRollback(instantToRollback.getTimestamp(), String.valueOf(Time.now()));
testTable.doRollback(instantToRollback.getTimestamp(), metaClient.createNewInstantTime());
validateMetadata(testTable);
++numRollbacks;
} catch (HoodieMetadataException e) {
// This is expected since we are rolling back commits that are older than the latest compaction on the MDT
break;
}
}
// Since each rollback also creates a deltacommit, we can only support rolling back of half of the original
// instants present before rollback started.
// assertTrue(numRollbacks >= minArchiveCommitsDataset / 2, "Rollbacks of non archived instants should work");
}

/**
Expand Down Expand Up @@ -1178,7 +1171,7 @@ public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType, bool
doCompaction(testTable, instantTime5, nonPartitionedDataset);
}
// added 60s to commitTime6 to make sure it is greater than compaction instant triggered by previous commit
String commitTime6 = metaClient.createNewInstantTime() + + 60000L;
String commitTime6 = HoodieInstantTimeGenerator.instantTimePlusMillis(InProcessTimeGenerator.createNewInstantTime(), 60000L);
doWriteOperation(testTable, commitTime6, UPSERT, nonPartitionedDataset);
String instantTime7 = metaClient.createNewInstantTime();
doRollback(testTable, commitTime6, instantTime7);
Expand Down Expand Up @@ -2301,22 +2294,21 @@ public void testErrorCases() throws Exception {
@Test
public void testMetadataTableWithLongLog() throws Exception {
init(COPY_ON_WRITE, false);
final int maxNumDeltacommits = 3;
final int maxNumDeltaCommits = 3;
writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.enableMetrics(false)
.withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltacommits + 100)
.withMaxNumDeltacommitsWhenPending(maxNumDeltacommits)
.withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltaCommits + 100)
.withMaxNumDeltacommitsWhenPending(maxNumDeltaCommits)
.build()).build();
initWriteConfigAndMetatableWriter(writeConfig, true);
testTable.addRequestedCommit(String.format("%016d", 0));
for (int i = 1; i <= maxNumDeltacommits; i++) {
for (int i = 1; i <= maxNumDeltaCommits; i++) {
doWriteOperation(testTable, String.format("%016d", i));
}
int instant = maxNumDeltacommits + 1;
Throwable t = assertThrows(HoodieMetadataException.class, () -> doWriteOperation(testTable, String.format("%016d", instant)));
assertTrue(t.getMessage().startsWith(String.format("Metadata table's deltacommits exceeded %d: ", maxNumDeltacommits)));
int instant = maxNumDeltaCommits + 1;
assertDoesNotThrow(() -> doWriteOperation(testTable, String.format("%016d", instant)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void init(HoodieTableType tableType, Option<HoodieWriteConfig> writeConfi
initWriteConfigAndMetatableWriter(this.writeConfig, enableMetadataTable);
}

protected void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) throws IOException {
protected void initWriteConfigAndMetatableWriter(HoodieWriteConfig writeConfig, boolean enableMetadataTable) {
this.writeConfig = writeConfig;
if (enableMetadataTable) {
metadataWriter = JavaHoodieBackedTableMetadataWriter.create(hadoopConf, writeConfig, context, Option.empty());
Expand Down
Loading

0 comments on commit 9b094e6

Please sign in to comment.