-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6758] Fixing deducing spurious log blocks due to spark retries #9611
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some flink tests in TestStreamWriteOperatorCoordinator
are failing. Please look into that.
cc @danny0405
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/engine/LocalTaskContextSupplier.java
Outdated
Show resolved
Hide resolved
If we do not test it well, can we revert #9545 first, and please do not merge such changes if we have no enough tests especially in the release process. |
hey @danny0405 : this patch fixes it end to end. I don't see a reason why we need to revert it though. if we don't have a solution yet, I agree we can revert it, but we already have a working solution e2e and has been tested w/ spark retries as well. |
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
Show resolved
Hide resolved
Okay, I'm just scared for any regression because the changes are very core, I know we already have 2e2 tests manually but still think we should give some buffer time to the community for the testing. Push it into 0.14.0 is not a very sensible choice. |
2d333fd
to
3e8fb83
Compare
@hudi-bot run azure |
@@ -62,4 +62,9 @@ public Option<String> getProperty(EngineProperty prop) { | |||
return Option.empty(); | |||
} | |||
|
|||
@Override | |||
public Supplier<Integer> getAttemptNumberSupplier() { | |||
return () -> -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Flink we already have getAttemptIdSupplier
which serves as the same purpose, what's the usage of Spark then? should we use getAttemptIdSupplier
instead ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as of now, I have disabled it for flink. can you test it out and fix it.
here is the situation w/ spark.
Lets say we want to spin up 10 tasks for a stage.
in first attempt,
each task will be assigned numbers from 1 to 10 for attemptId. but for attempNumber, it will be 0.
and when a subset of tasks are retried, new attemptIds will be 11, 12 etc. but attemptNumber will be 1.
This is how it works in spark. I am not very sure on flink. Anyways, w/ latest commit, we are avoiding writing the block identifier flink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, in summary, attemptId supplier is not the right one to use atleast in spark. It has to be attempt number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does spark retry always takes the same data set? Is is possible one retried task goes to other executor/container and takes a different input dataset there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrt log files, it should not, bcoz, only updates go to log files.
incase of bucket Index, anyways, record key to fileId is hash based and we should be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only updates go to log files.
Only true for spark, so you are fixing a bug dependent on Spark write christeristic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. we have disabled it for flink as of now. In java, anyways, MOR is not fully functional from what I know. but I am open to disabling it for java as well. mainly its an issue for ExpressionPayload and any other custom payloads. most of the other payloads are idempotent even if there are duplicate log blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make the Flink impl right first by using this.flinkRuntimeContext.getAttemptNumber()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, will sync up w/ you directly to better understand this.
https://issues.apache.org/jira/browse/HUDI-6844
ab1897d
to
f2cc702
Compare
5620dc4
to
e86aa76
Compare
e86aa76
to
375c15d
Compare
@hudi-bot run azure |
…9611) - We attempted a fix to avoid reading spurious log blocks on the reader side with #9545. When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch. Tested end to test by simulating spark retries and spurious log blocks. Reader is able to detect them and ignore multiple copies of log blocks.
…pache#9611) - We attempted a fix to avoid reading spurious log blocks on the reader side with apache#9545. When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch. Tested end to test by simulating spark retries and spurious log blocks. Reader is able to detect them and ignore multiple copies of log blocks.
@nsivabalan Good job.
For the third case, which stage retried. Task attempt number might be back to the original value. We might need to using |
I would rather we revert this change first if there is no thorough solution or put a flag for switching whereas by default as disabled. |
hey @beyond1920 : thanks for flagging this. I agree w/ danny. will probably revert the change for now. |
@nsivabalan hi, I have a question. Why can't we use |
Change Logs
We attempted a fix to avoid reading spurious log blocks on the reader side with #9545.
When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch. Tested end to test by simulating spark retries and spurious log blocks. Reader is able to detect them and ignore multiple copies of log blocks.
Impact
Properly deduce and skip spurious log blocks on the reader
Risk level (write none, low medium or high below)
medium
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist