Skip to content

Commit

Permalink
[HUDI-7549] Reverting spurious log block deduction with LogRecordRead…
Browse files Browse the repository at this point in the history
…er (#10922)

Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
  • Loading branch information
nsivabalan and yihua committed May 15, 2024
1 parent cb2d27d commit acebbb8
Show file tree
Hide file tree
Showing 11 changed files with 6 additions and 347 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;

Expand Down Expand Up @@ -132,11 +131,6 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
private boolean useWriterSchema = false;

private Properties recordProperties = new Properties();
// Block Sequence number will be used to detect duplicate log blocks(by log reader) added due to spark task retries.
// It should always start with 0 for a given file slice. for roll overs and delete blocks, we increment by 1.
private int blockSequenceNumber = 0;
// On task failures, a given task could be retried. So, this attempt number will track the number of attempts.
private int attemptNumber = 0;

/**
* This is used by log compaction only.
Expand All @@ -148,7 +142,6 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
this.useWriterSchema = true;
this.isLogCompaction = true;
this.header.putAll(header);
this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
}

public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Expand All @@ -158,7 +151,6 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
}

public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Expand Down Expand Up @@ -455,13 +447,11 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header,
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();

blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config,
addBlockIdentifier()), keyField));
blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, header, keyField));
}

if (appendDeleteBlocks && recordsToDelete.size() > 0) {
blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config,
addBlockIdentifier())));
blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), header));
}

if (blocks.size() > 0) {
Expand Down Expand Up @@ -558,10 +548,6 @@ protected boolean needsUpdateLocation() {
return true;
}

protected boolean addBlockIdentifier() {
return true;
}

private void writeToBuffer(HoodieRecord<T> record) {
if (!partitionPath.equals(record.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
Expand Down Expand Up @@ -635,16 +621,6 @@ private HoodieLogBlock.HoodieLogBlockType pickLogDataBlockFormat() {
}
}

private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber, long attemptNumber,
HoodieWriteConfig config, boolean addBlockIdentifier) {
Map<HeaderMetadataType, String> updatedHeader = new HashMap<>();
updatedHeader.putAll(header);
if (addBlockIdentifier && !HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block sequence numbers only for data table.
updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, String.valueOf(attemptNumber) + "," + String.valueOf(blockSequenceNumber));
}
return updatedHeader;
}

private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig,
HoodieLogBlock.HoodieLogBlockType logDataBlockFormat,
List<HoodieRecord> records,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,4 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return null;
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,4 @@ public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
&& hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
}

protected boolean addBlockIdentifier() {
return false;
}

@Override
public List<WriteStatus> close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,4 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,6 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> (int)attemptId;
}
}

protected void initFileSystem(String basePath, StorageConfiguration<?> hadoopConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ public Supplier<Long> getAttemptIdSupplier() {
return () -> TaskContext.get().taskAttemptId();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> TaskContext.get().attemptNumber();
}

@Override
public Option<String> getProperty(EngineProperty prop) {
if (prop == EngineProperty.TOTAL_MEMORY_AVAILABLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,4 @@ public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,4 @@ public abstract class TaskContextSupplier implements Serializable {
public abstract Supplier<Long> getAttemptIdSupplier();

public abstract Option<String> getProperty(EngineProperty prop);

/**
* @returns the attempt number for the task of interest. Attempt starts with 0 and goes up by 1 on retries.
*/
public abstract Supplier<Integer> getAttemptNumberSupplier();
}
Loading

0 comments on commit acebbb8

Please sign in to comment.