Skip to content

Commit

Permalink
[ENG-14372][HUDI-8248] Fixing Log Record reader to include rollback b…
Browse files Browse the repository at this point in the history
…locks with timestamps > maxInstant times (#942)
  • Loading branch information
nsivabalan committed Oct 1, 2024
1 parent 3530e3d commit b305102
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
if (HoodieLogBlock.HoodieLogBlockType.isDataOrDeleteBlock(logBlock.getBlockType())) {
if (logBlock.isDataOrDeleteBlock()) {
if (HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) {
// Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader
continue;
Expand Down Expand Up @@ -447,7 +447,7 @@ private void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessin
totalCorruptBlocks.incrementAndGet();
continue;
}
if (HoodieLogBlock.HoodieLogBlockType.isDataOrDeleteBlock(logBlock.getBlockType())) {
if (logBlock.isDataOrDeleteBlock()) {
if (HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) {
// Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public byte[] getMagic() {

public abstract HoodieLogBlockType getBlockType();

public boolean isDataOrDeleteBlock() {
return getBlockType().isDataOrDeleteBlock();
}

public long getLogBlockLength() {
throw new HoodieException("No implementation was provided");
}
Expand Down Expand Up @@ -162,11 +166,10 @@ public static HoodieLogBlockType fromId(String id) {
}

/**
* @param logBlockType log block type to be inspected.
* @returns true if the log block type refers to data or delete block. false otherwise.
*/
public static boolean isDataOrDeleteBlock(HoodieLogBlockType logBlockType) {
return logBlockType != HoodieLogBlockType.COMMAND_BLOCK && logBlockType != HoodieLogBlockType.CORRUPT_BLOCK;
public boolean isDataOrDeleteBlock() {
return this != HoodieLogBlockType.COMMAND_BLOCK && this != HoodieLogBlockType.CORRUPT_BLOCK;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void testHoodieLogBlockTypeIsDataOrDeleteBlock() {
dataOrDeleteBlocks.add(HoodieLogBlockType.CDC_DATA_BLOCK);

Arrays.stream(HoodieLogBlockType.values()).forEach(logBlockType -> {
assertEquals(HoodieLogBlockType.isDataOrDeleteBlock(logBlockType), dataOrDeleteBlocks.contains(logBlockType));
assertEquals(dataOrDeleteBlocks.contains(logBlockType), logBlockType.isDataOrDeleteBlock());
});
}

Expand Down Expand Up @@ -778,7 +778,7 @@ private List<HoodieLogFile> getSortedLogFilesList(List<Set<HoodieLogFile>> logFi
}

private void readAndValidate(Schema schema, String maxCommitTime, List<HoodieLogFile> logFiles, List<IndexedRecord> expectedRecords) throws IOException {
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
try (HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withStorage(storage)
.withBasePath(basePath)
.withLogFilePaths(
Expand All @@ -793,17 +793,17 @@ private void readAndValidate(Schema schema, String maxCommitTime, List<HoodieLog
.withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
.withBitCaskDiskMapCompressionEnabled(false)
.withOptimizedLogBlocksScan(false)
.build();
.build()) {

List<IndexedRecord> scannedRecords = new ArrayList<>();
for (HoodieRecord record : scanner) {
scannedRecords.add((IndexedRecord)
((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
}
List<IndexedRecord> scannedRecords = new ArrayList<>();
for (HoodieRecord record : scanner) {
scannedRecords.add((IndexedRecord)
((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
}

assertEquals(sort(expectedRecords), sort(scannedRecords),
"Scanner records count should be the same as appended records");
scanner.close();
assertEquals(sort(expectedRecords), sort(scannedRecords),
"Scanner records count should be the same as appended records");
}
}

private Pair<List<IndexedRecord>, Set<HoodieLogFile>> appendAndValidate(Schema schema, SchemaTestUtil testUtil, ExternalSpillableMap.DiskMapType diskMapType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexTyp
final String p3 = "p3";
List<HoodieRecord> insertsAtEpoch0 = getInserts(totalRecords, p1, 0, payloadClass);
List<HoodieRecord> updatesAtEpoch5 = getUpdates(insertsAtEpoch0.subList(0, 4), p2, 5, payloadClass);
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {

try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
// 1st batch: inserts
String commitTimeAtEpoch0 = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(commitTimeAtEpoch0);
Expand Down

0 comments on commit b305102

Please sign in to comment.