-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6758] Detecting and skipping Spurious log blocks with MOR reads #9545
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach with using block sequence numbers looks good. For my understanding, when can scenario 4 in the PR description happen?
Also, is it possible to add some e2e test? TestHoodieLogFormat
? I guess it might be difficult to intentionally fail a spark task in unit test but please see if it's doable. We can reuse for some tests too.
// Block Sequence number will be used to detect duplicate log blocks(by log reader) added due to spark task retries. | ||
// It should always start with 0 for a given file slice. for roll overs and delete blocks, we increment by 1. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Block Sequence number will be used to detect duplicate log blocks(by log reader) added due to spark task retries. | |
// It should always start with 0 for a given file slice. for roll overs and delete blocks, we increment by 1. | |
// Block sequence number will be used to detect duplicate log blocks (by log reader) added due to spark task retries. | |
// It should always start with 0 for a given file slice. For rollovers and delete blocks, it is incremented by 1. |
@@ -632,6 +635,13 @@ private HoodieLogBlock.HoodieLogBlockType pickLogDataBlockFormat() { | |||
} | |||
} | |||
|
|||
private static Map<HeaderMetadataType, String> getUpdatedHeader(Map<HeaderMetadataType, String> header, int blockSequenceNumber) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious, why create a copy? we can simply update the map in place.
@@ -108,8 +110,6 @@ public abstract class AbstractHoodieLogRecordReader { | |||
private final TypedProperties payloadProps; | |||
// Log File Paths | |||
protected final List<String> logFilePaths; | |||
// Read Lazily flag | |||
private final boolean readBlocksLazily; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this being removed? Did we switch to lazily reading at all the places?
* Map<String, List<List<Pair<Integer,HoodieLogBlock>>>> blockSequenceMapPerCommit | ||
* Key: Commit time. | ||
* Value: List<List<Pair<Integer,HoodieLogBlock>>> | ||
* Value refers to a List of different block sequences tracked. | ||
* | ||
* For eg, if there were two attempts for a file slice while writing(due to spark task retries), here is how the map might look like | ||
* key: commit1 | ||
* value : { | ||
* entry1: List = { {0, lb1}, {1, lb2} }, | ||
* entry2: List = { {0, lb3}, {1, lb4}, {2, lb5}} | ||
* } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While the comment helps, can we simplify this a bit? Maybe add another private class encapsulating blockSequenceNumber and HoodieLogBlock. It would make reading the code a lot better. Also, we can eliminate repeatedly get/constainsKey by using putIfAbsent
.
blockSequenceMapPerCommit.put(instantTime, curCommitBlockList); | ||
} | ||
} | ||
|
||
private void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessingBlocks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need the logic for scanInternalV2?
} | ||
|
||
@Test | ||
public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException, URISyntaxException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void testAppendsWithSpruiousLogBlocksExactDup() throws IOException, URISyntaxException, InterruptedException { | |
public void testAppendsWithSpuriousLogBlocksExactDup() throws IOException, URISyntaxException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spurious
spelling here and other places in the test.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
Show resolved
Hide resolved
@nsivabalan Did you consider adding a new command block type which could work as a commit marker? Lets assume commit C1 was to add 2 log blocks to a log file. Lets assume the log file already has the following content (I am assuming appends enabled on the log file for simplicity here but this should work with append disabled too). So now commit C1 will add 2 log blocks resulting in: The issue you have is that if the Spark stage retries lead to repeated writes of log_block_c1_1, log_block_c1_2. Lets assume that all writes of log blocks should end with a valid commit command block: log file: [log_block_c0_1, COMMIT_COMMAND_BLOCK_C0, log_block_c1_1, log_block_c1_2, COMMIT_COMMAND_BLOCK_C1] If a valid commit command block is not found then the preceding blocks are not valid. If multiple COMMIT_COMMAND_BLOCK_XX are found then the reader can choose the last one. This idea is similar to how databases use START_COMMIT and END_COMMIT markers in WAL (write-ahead-log) etc. |
Another idea: When a log block has final_block=true then it means that that this is the final block for the commit else this is followed by more blocks. e.g. Commit 0 which adds a single log block. Commit 1 which adds 2 log blocks. The logic to detect the completed log blocks may be simpler in this case. |
Each time we add a new metadata type, the table can not be read with old Hudi release, |
Our expectation is readers are upgraded first. It's made clear in every release notes. |
It's okay if that is already our compatibility routine, in production, readers tends to be im multiple versions. So keeping reading backward compatibility is also important. |
…9611) - We attempted a fix to avoid reading spurious log blocks on the reader side with #9545. When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch. Tested end to test by simulating spark retries and spurious log blocks. Reader is able to detect them and ignore multiple copies of log blocks.
…9611) - We attempted a fix to avoid reading spurious log blocks on the reader side with #9545. When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch. Tested end to test by simulating spark retries and spurious log blocks. Reader is able to detect them and ignore multiple copies of log blocks.
…apache#9545) - Detect and skip duplicate log blocks due to task retries. - Detection based on block sequence number that keeps increasing monotonically during rollover.
…pache#9611) - We attempted a fix to avoid reading spurious log blocks on the reader side with apache#9545. When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch. Tested end to test by simulating spark retries and spurious log blocks. Reader is able to detect them and ignore multiple copies of log blocks.
Change Logs
Due to spark retries, we could have duplicated log blocks added during write. And since, we don't delete any log files during marker based reconciliation on the writer side, the reader could see duplicated log blocks. for most of the payload implementation, this should not be an issue. But for expression payload, it could result in data consistency since an expression could be evaluated twice (for eg, colA*2).
Fix
With spark retries, there could be duplicate log blocks created. So, we might need to add some kind of block identifier to every block added. Already we have a header with key "INSTANT_TIME". In addition we can add "BLOCK_SEQUENCE_NO". BlockSequence number is a combination of a simple counter which will increment from 1 and keep increasing during rollover for a given a file slice(append handle) and the attempt number combo. On spark retries, again, we start with sequence no of 1 for a given File slice, but the attempt number would have been incremented by 1. Because, for a given commit and for a given file slice, with spark task or stage retries, entire file slice will be attempted again. so, just block seq no and attempt number would suffice. For eg, if a file slice is supposed to get 3 log files due to log file size rollovers, and during first attempt only 2 log files were created, in 2nd attempt, all 3 log files will have to re-created.
On the reader side, if there are duplicate blocks detected with same commit time and same block sequence number, we should find the longest sequence and pick them if more than one sequence is found. And corrupt blocks should be ignored during this reconciliation as well. We should account for backwards compatability where the block seq number may not be present. For rollback blocks, we can statically set the sequence no to 1 since there should be only one log block per file slice.
Diff scenarios:
As called out above, we will go with a simple Block_Sequence_No which starts with 0 and increments for a given file slice. Major crux here is, how the reader will reconcile if there are duplicate log files created due to spark task failures. High level logic to reconcile for the reader is as follows: Reader find the longest sequence of BSN(block sequence number) and picks the maximum one. So, we do one pass over every log block (which we already do as of today) to parse header info, w/o deserializing actual content and determine the right ones (and ignore the spurious log blocks)
Scenario 1:
Happy path. no retries.
log_file1(BSN : 0), log_file2(BSN:1) added. all of them has attempt No = 0
Reader:
finds only one sequence of 0,1 for BSN and marks both log_file1 and log_file2 as valid.
Scenario 2:
Task failed and retired, where 1st attempt did not create entire list of log files.
attempt1:
log_file1(BSN : 0), log_file2(BSN:1) added. attempt No = 0.
attempt2:
log_file3(BSN : 0), log_file4(BSN:1), log_file5(BSN:2) added. // BSN starts with 0 everytime for a given file slice. attempt No = 1
Reader:
Finds two sequence of Block sequence nos.
{0, (0,1)} and {1, (0,1,2)}. And so chooses lf3, lf4 and lf5 as valid and ignores lf1 and lf2 as spurious ones.
Scenario 3:
Task failed and retired, where both attempts created full list. (spark speculation may be)
attempt1:
log_file1(BSN : 0), log_file2(BSN:1) added. attempt No = 0
attempt2:
log_file3(BSN : 0), log_file4(BSN:1) added. attempt No = 1
Reader:
Finds two sequence of Block sequence nos.
{0, (0,1)} and {1, (0,1)}. And so chooses lf3 and lf4 as valid and ignores lf1 and lf2 as spurious ones. We will probably pick the latest set if both sets have same sequence.
Scenario 4:
Task failed and retired, where 1st attempt has full list of log files.
attempt1:
log_file1(BSN : 0), log_file2(BSN:1) added. attempt No = 0
attempt2:
log_file3(BSN : 0) added. attempt No = 1.
Reader:
Finds two sequence of Block sequence nos.
{0, (0,1)} and {1, (0)|. And so chooses lf1 and lf2 as valid and ignores lf3 as spurious one.
Same logic should work out for hdfs as well. Since log blocks are ordered as per log file name w/ versions, the ordering should be intact. i.e. log files/log blocks created by 2nd attempt should not interleave w/ log blocks during w/ 1st attempt. If log blocks are within same log file, the log blocks will be ordered for sure. If its across log files, the log version number ordering should suffice (which already happens).
Scenario 3 with hdfs:
Task failed and retired, where both attempts created full list. (spark speculation may be). but in one of the attempt, there are partial writes(corrupted blocks).
attempt1:
log_file1, block1(BSN : 0), log_file1, block2(BSN:1), but a corrupt block due to partial write. attempt No = 0.
attempt2:
log_file1, block3(BSN : 0), log_file2, block4 (BSN:1) added. attempt No = 1
Reader:
Finds two sequence of Block sequence nos.
Since lf1, block2 is a corrupt block, valid sequences deduced are (0) and (0,1). And so we choose lf1, block3 and lf1, block3 as valid and ignores lf1, block1 and lf1, block2 as invalid.
Impact
MOR log record reader will never read any duplicated log blocks created due to spark task retries during write.
Risk level (write none, low medium or high below)
medium.
Documentation Update
N/A
Contributor's checklist