Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6758] Fixing deducing spurious log blocks due to spark retries #9611

Merged
merged 5 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -129,6 +130,8 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
// Block Sequence number will be used to detect duplicate log blocks(by log reader) added due to spark task retries.
// It should always start with 0 for a given file slice. for roll overs and delete blocks, we increment by 1.
private int blockSequenceNumber = 0;
// On task failures, a given task could be retried. So, this attempt number will track the number of attempts.
private int attemptNumber = 0;

/**
* This is used by log compaction only.
Expand All @@ -140,6 +143,7 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
this.useWriterSchema = true;
this.isLogCompaction = true;
this.header.putAll(header);
this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
}
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved

public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Expand All @@ -150,6 +154,7 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
}

public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Expand Down Expand Up @@ -454,11 +459,13 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header,
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();

blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, getUpdatedHeader(header, blockSequenceNumber++, taskContextSupplier.getAttemptIdSupplier().get()), keyField));
blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config,
addBlockIdentifier()), keyField));
}

if (appendDeleteBlocks && recordsToDelete.size() > 0) {
blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, taskContextSupplier.getAttemptIdSupplier().get())));
blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config,
addBlockIdentifier())));
}

if (blocks.size() > 0) {
Expand Down Expand Up @@ -555,6 +562,10 @@ protected boolean needsUpdateLocation() {
return true;
}

protected boolean addBlockIdentifier() {
return true;
}

private void writeToBuffer(HoodieRecord<T> record) {
if (!partitionPath.equals(record.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
Expand Down Expand Up @@ -628,10 +639,13 @@ private HoodieLogBlock.HoodieLogBlockType pickLogDataBlockFormat() {
}
}

private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber, long attemptNumber) {
private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber, long attemptNumber,
HoodieWriteConfig config, boolean addBlockIdentifier) {
Map<HeaderMetadataType, String> updatedHeader = new HashMap<>();
updatedHeader.putAll(header);
updatedHeader.put(HeaderMetadataType.BLOCK_SEQUENCE_NUMBER, String.valueOf(attemptNumber) + "," + String.valueOf(blockSequenceNumber));
if (addBlockIdentifier && !HoodieTableMetadata.isMetadataTable(config.getBasePath())) { // add block sequence numbers only for data table.
updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, String.valueOf(attemptNumber) + "," + String.valueOf(blockSequenceNumber));
}
return updatedHeader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,9 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return null;
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Flink we already have getAttemptIdSupplier which serves as the same purpose, what's the usage of Spark then? should we use getAttemptIdSupplier instead ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as of now, I have disabled it for flink. can you test it out and fix it.

here is the situation w/ spark.

Lets say we want to spin up 10 tasks for a stage.
in first attempt,
each task will be assigned numbers from 1 to 10 for attemptId. but for attempNumber, it will be 0.
and when a subset of tasks are retried, new attemptIds will be 11, 12 etc. but attemptNumber will be 1.

This is how it works in spark. I am not very sure on flink. Anyways, w/ latest commit, we are avoiding writing the block identifier flink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in summary, attemptId supplier is not the right one to use atleast in spark. It has to be attempt number.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does spark retry always takes the same data set? Is is possible one retried task goes to other executor/container and takes a different input dataset there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrt log files, it should not, bcoz, only updates go to log files.
incase of bucket Index, anyways, record key to fileId is hash based and we should be good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only updates go to log files.

Only true for spark, so you are fixing a bug dependent on Spark write christeristic.

Copy link
Contributor Author

@nsivabalan nsivabalan Sep 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. we have disabled it for flink as of now. In java, anyways, MOR is not fully functional from what I know. but I am open to disabling it for java as well. mainly its an issue for ExpressionPayload and any other custom payloads. most of the other payloads are idempotent even if there are duplicate log blocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the Flink impl right first by using this.flinkRuntimeContext.getAttemptNumber()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will sync up w/ you directly to better understand this.
https://issues.apache.org/jira/browse/HUDI-6844

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
&& hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
}

protected boolean addBlockIdentifier() {
return false;
}

@Override
public List<WriteStatus> close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> (int)attemptId;
}
}

protected void initFileSystem(String basePath, Configuration hadoopConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public Supplier<Long> getAttemptIdSupplier() {
return () -> TaskContext.get().taskAttemptId();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> TaskContext.get().attemptNumber();
}

@Override
public Option<String> getProperty(EngineProperty prop) {
if (prop == EngineProperty.TOTAL_MEMORY_AVAILABLE) {
Expand Down Expand Up @@ -89,4 +94,5 @@ public Option<String> getProperty(EngineProperty prop) {
}
throw new HoodieException("Unknown engine property :" + prop);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,10 @@ public Supplier<Long> getAttemptIdSupplier() {
public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ public abstract class TaskContextSupplier implements Serializable {
public abstract Supplier<Long> getAttemptIdSupplier();

public abstract Option<String> getProperty(EngineProperty prop);

/**
* @returns the attempt number for the task of interest. Attempt starts with 0 and goes up by 1 on retries.
*/
public abstract Supplier<Integer> getAttemptNumberSupplier();
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_SEQUENCE_NUMBER;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME;
Expand Down Expand Up @@ -225,6 +226,7 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
currentInstantLogBlocks = new ArrayDeque<>();
List<HoodieLogBlock> validLogBlockInstants = new ArrayList<>();
Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit = new HashMap<>();
AtomicBoolean blockIdentifiersPresent = new AtomicBoolean(false);

progress = 0.0f;
totalLogFiles = new AtomicLong(0);
Expand All @@ -251,13 +253,13 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
// Use the HoodieLogFileReader to iterate through the blocks in the log file
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
final String blockSequenceNumberStr = logBlock.getLogBlockHeader().getOrDefault(BLOCK_SEQUENCE_NUMBER, "");
int blockSeqNo = -1;
long attemptNo = -1L;
if (!StringUtils.isNullOrEmpty(blockSequenceNumberStr)) {
String[] parts = blockSequenceNumberStr.split(",");
attemptNo = Long.parseLong(parts[0]);
blockSeqNo = Integer.parseInt(parts[1]);
final String blockIdentifier = logBlock.getLogBlockHeader().getOrDefault(BLOCK_IDENTIFIER, StringUtils.EMPTY_STRING);
int blockSeqNumber = -1;
long attemptNumber = -1L;
if (!StringUtils.isNullOrEmpty(blockIdentifier)) {
String[] parts = blockIdentifier.split(",");
attemptNumber = Long.parseLong(parts[0]);
blockSeqNumber = Integer.parseInt(parts[1]);
}
totalLogBlocks.incrementAndGet();
if (logBlock.getBlockType() != CORRUPT_BLOCK
Expand Down Expand Up @@ -285,14 +287,14 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
// store the current block
currentInstantLogBlocks.push(logBlock);
validLogBlockInstants.add(logBlock);
updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, attemptNo, blockSequenceMapPerCommit);
updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
break;
case DELETE_BLOCK:
LOG.info("Reading a delete block from file " + logFile.getPath());
// store deletes so can be rolled back
currentInstantLogBlocks.push(logBlock);
validLogBlockInstants.add(logBlock);
updateBlockSequenceTracker(logBlock, instantTime, blockSeqNo, attemptNo, blockSequenceMapPerCommit);
updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent);
break;
case COMMAND_BLOCK:
// Consider the following scenario
Expand Down Expand Up @@ -383,14 +385,19 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, blockSequenceMapPerCommit);
if (dedupedLogBlocksInfo.getKey()) {
// if there are duplicate log blocks that needs to be removed, we re-create the queue for valid log blocks from dedupedLogBlocks
currentInstantLogBlocks = new ArrayDeque<>();
dedupedLogBlocksInfo.getValue().forEach(block -> currentInstantLogBlocks.push(block));
LOG.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
} else {
boolean duplicateBlocksDetected = false;
if (blockIdentifiersPresent.get()) {
Pair<Boolean, List<HoodieLogBlock>> dedupedLogBlocksInfo = reconcileSpuriousBlocksAndGetValidOnes(validLogBlockInstants, blockSequenceMapPerCommit);
duplicateBlocksDetected = dedupedLogBlocksInfo.getKey();
if (duplicateBlocksDetected) {
// if there are duplicate log blocks that needs to be removed, we re-create the queue for valid log blocks from dedupedLogBlocks
currentInstantLogBlocks = new ArrayDeque<>();
dedupedLogBlocksInfo.getValue().forEach(block -> currentInstantLogBlocks.push(block));
LOG.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
}
}
if (!duplicateBlocksDetected) {
// if there are no dups, we can take currentInstantLogBlocks as is.
LOG.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
Expand Down Expand Up @@ -429,30 +436,33 @@ private Pair<Boolean, List<HoodieLogBlock>> reconcileSpuriousBlocksAndGetValidOn

boolean dupsFound = blockSequenceMapPerCommit.values().stream().anyMatch(perCommitBlockList -> perCommitBlockList.size() > 1);
if (dupsFound) {
if (LOG.isDebugEnabled()) {
logBlockSequenceMapping(blockSequenceMapPerCommit);
}

// duplicates are found. we need to remove duplicate log blocks.
for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> entry: blockSequenceMapPerCommit.entrySet()) {
Map<Long, List<Pair<Integer, HoodieLogBlock>>> perCommitBlockSequences = entry.getValue();
if (perCommitBlockSequences.size() > 1) {
// only those that have more than 1 sequence needs deduping.
int maxSequenceCount = -1;
int maxAttemptNo = -1;
int totalSequences = perCommitBlockSequences.size();
int counter = 0;
for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> perAttemptEntries : perCommitBlockSequences.entrySet()) {
Long attemptNo = perAttemptEntries.getKey();
int size = perAttemptEntries.getValue().size();
if (maxSequenceCount < size) {
if (maxSequenceCount <= size) {
maxSequenceCount = size;
maxAttemptNo = Math.toIntExact(attemptNo);
}
counter++;
}
// for other sequence (!= maxSequenceIndex), we need to remove the corresponding logBlocks from allValidLogBlocks
// for other sequences (!= maxSequenceIndex), we need to remove the corresponding logBlocks from allValidLogBlocks
for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> perAttemptEntries : perCommitBlockSequences.entrySet()) {
Long attemptNo = perAttemptEntries.getKey();
if (maxAttemptNo != attemptNo) {
List<HoodieLogBlock> logBlocksToRemove = perCommitBlockSequences.get(attemptNo).stream().map(pair -> pair.getValue()).collect(Collectors.toList());
logBlocksToRemove.forEach(logBlockToRemove -> allValidLogBlocks.remove(logBlocksToRemove));
logBlocksToRemove.forEach(logBlockToRemove -> {
allValidLogBlocks.remove(logBlockToRemove);
});
}
}
}
Expand All @@ -463,6 +473,21 @@ private Pair<Boolean, List<HoodieLogBlock>> reconcileSpuriousBlocksAndGetValidOn
}
}

private void logBlockSequenceMapping(Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) {
LOG.warn("Duplicate log blocks found ");
for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> entry : blockSequenceMapPerCommit.entrySet()) {
if (entry.getValue().size() > 1) {
LOG.warn("\tCommit time " + entry.getKey());
Map<Long, List<Pair<Integer, HoodieLogBlock>>> value = entry.getValue();
for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> attemptsSeq : value.entrySet()) {
LOG.warn("\t\tAttempt number " + attemptsSeq.getKey());
attemptsSeq.getValue().forEach(entryValue -> LOG.warn("\t\t\tLog block sequence no : " + entryValue.getKey() + ", log file "
+ entryValue.getValue().getBlockContentLocation().get().getLogFile().getPath().toString()));
}
}
}
}

/**
* Updates map tracking block seq no.
* Here is the map structure.
Expand All @@ -483,30 +508,32 @@ private Pair<Boolean, List<HoodieLogBlock>> reconcileSpuriousBlocksAndGetValidOn
*
* @param logBlock log block of interest to be added.
* @param instantTime commit time of interest.
* @param blockSeqNo block sequence number.
* @param blockSeqNumber block sequence number.
* @param blockSequenceMapPerCommit map tracking per commit block sequences.
*/
private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String instantTime, int blockSeqNo, long attemptNo,
Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit) {
if (blockSeqNo != -1 && attemptNo != -1) { // update the block sequence tracker for log blocks containing the same.
private void updateBlockSequenceTracker(HoodieLogBlock logBlock, String instantTime, int blockSeqNumber, long attemptNumber,
Map<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> blockSequenceMapPerCommit,
AtomicBoolean blockIdentifiersPresent) {
if (blockSeqNumber != -1 && attemptNumber != -1) { // update the block sequence tracker for log blocks containing the same.
blockIdentifiersPresent.set(true);
blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new HashMap<>());
Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = blockSequenceMapPerCommit.get(instantTime);
if (curCommitBlockMap.containsKey(attemptNo)) {
if (curCommitBlockMap.containsKey(attemptNumber)) {
// append to existing map entry
curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock));
curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber, logBlock));
} else {
// create a new map entry
curCommitBlockMap.put(attemptNo, new ArrayList<>());
curCommitBlockMap.get(attemptNo).add(Pair.of(blockSeqNo, logBlock));
curCommitBlockMap.put(attemptNumber, new ArrayList<>());
curCommitBlockMap.get(attemptNumber).add(Pair.of(blockSeqNumber, logBlock));
}
// update the latest to block sequence tracker
blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
} else {
// all of older blocks are considered valid. there should be only one list for older commits where block sequence number is not present.
blockSequenceMapPerCommit.computeIfAbsent(instantTime, entry -> new HashMap<>());
Map<Long, List<Pair<Integer, HoodieLogBlock>>> curCommitBlockMap = blockSequenceMapPerCommit.get(instantTime);
curCommitBlockMap.put(0L, new ArrayList<>());
curCommitBlockMap.get(0L).add(Pair.of(blockSeqNo, logBlock));
curCommitBlockMap.computeIfAbsent(0L, entry -> new ArrayList<>());
curCommitBlockMap.get(0L).add(Pair.of(blockSeqNumber, logBlock));
// update the latest to block sequence tracker
blockSequenceMapPerCommit.put(instantTime, curCommitBlockMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public static HoodieLogBlockType fromId(String id) {
* new enums at the end.
*/
public enum HeaderMetadataType {
INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_SEQUENCE_NUMBER
INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA, COMMAND_BLOCK_TYPE, COMPACTED_BLOCK_TIMES, RECORD_POSITIONS, BLOCK_IDENTIFIER
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2915,7 +2915,7 @@ private static Set<HoodieLogFile> writeLogFiles(Path partitionPath,
private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber) {
Map<HeaderMetadataType, String> updatedHeader = new HashMap<>();
updatedHeader.putAll(header);
updatedHeader.put(HeaderMetadataType.BLOCK_SEQUENCE_NUMBER, String.valueOf(blockSequenceNumber));
updatedHeader.put(HeaderMetadataType.BLOCK_IDENTIFIER, String.valueOf(blockSequenceNumber));
return updatedHeader;
}

Expand Down
Loading