Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7549] Reverting spurious log block deduction with LogRecordReader #10922

Merged
merged 5 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;

Expand Down Expand Up @@ -130,11 +129,6 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
private boolean useWriterSchema = false;

private final Properties recordProperties = new Properties();
// Block Sequence number will be used to detect duplicate log blocks(by log reader) added due to spark task retries.
// It should always start with 0 for a given file slice. for rolling-over and delete blocks, we increment by 1.
private int blockSequenceNumber = 0;
// On task failures, a given task could be retried. So, this attempt number will track the number of attempts.
private int attemptNumber = 0;

/**
* This is used by log compaction only.
Expand All @@ -146,7 +140,6 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
this.useWriterSchema = true;
this.isLogCompaction = true;
this.header.putAll(header);
this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
}

public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Expand All @@ -163,7 +156,6 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
this.shouldWriteRecordPositions = config.shouldWriteRecordPositions();
}

Expand Down Expand Up @@ -470,14 +462,12 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header,
: hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();

blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, shouldWriteRecordPositions,
getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config,
addBlockIdentifier()), keyField));
getUpdatedHeader(header, config), keyField));
}

if (appendDeleteBlocks && recordsToDeleteWithPositions.size() > 0) {
blocks.add(new HoodieDeleteBlock(recordsToDeleteWithPositions, shouldWriteRecordPositions,
getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config,
addBlockIdentifier())));
getUpdatedHeader(header, config)));
}

if (blocks.size() > 0) {
Expand Down Expand Up @@ -576,10 +566,6 @@ protected boolean needsUpdateLocation() {
return true;
}

protected boolean addBlockIdentifier() {
return true;
}

private void writeToBuffer(HoodieRecord<T> record) {
if (!partitionPath.equals(record.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
Expand Down Expand Up @@ -654,12 +640,9 @@ private HoodieLogBlock.HoodieLogBlockType pickLogDataBlockFormat() {
}
}

private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber, long attemptNumber,
HoodieWriteConfig config, boolean addBlockIdentifier) {
private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header,
HoodieWriteConfig config) {
Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(header);
if (addBlockIdentifier && !HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block sequence numbers only for data table.
updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, attemptNumber + "," + blockSequenceNumber);
}
if (config.shouldWritePartialUpdates()) {
yihua marked this conversation as resolved.
Show resolved Hide resolved
// When enabling writing partial updates to the data blocks, the "IS_PARTIAL" flag is also
// written to the block header so that the reader can differentiate partial updates, i.e.,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,4 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return null;
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,4 @@ public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
&& hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
}

protected boolean addBlockIdentifier() {
return false;
}

@Override
public List<WriteStatus> close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,4 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,6 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> (int)attemptId;
}
}

protected void initFileSystem(String basePath, StorageConfiguration<?> hadoopConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ public Supplier<Long> getAttemptIdSupplier() {
return () -> TaskContext.get().taskAttemptId();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> TaskContext.get().attemptNumber();
}

@Override
public Option<String> getProperty(EngineProperty prop) {
if (prop == EngineProperty.TOTAL_MEMORY_AVAILABLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,4 @@ public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,4 @@ public abstract class TaskContextSupplier implements Serializable {
public abstract Supplier<Long> getAttemptIdSupplier();

public abstract Option<String> getProperty(EngineProperty prop);

/**
* @returns the attempt number for the task of interest. Attempt starts with 0 and goes up by 1 on retries.
*/
public abstract Supplier<Integer> getAttemptNumberSupplier();
}
Loading
Loading