Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6758] Fixing deducing spurious log blocks due to spark retries #9611

Merged
merged 5 commits into from
Sep 10, 2023

Conversation

nsivabalan
Copy link
Contributor

Change Logs

We attempted a fix to avoid reading spurious log blocks on the reader side with #9545.
When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch. Tested end to test by simulating spark retries and spurious log blocks. Reader is able to detect them and ignore multiple copies of log blocks.

Impact

Properly deduce and skip spurious log blocks on the reader

Risk level (write none, low medium or high below)

medium

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some flink tests in TestStreamWriteOperatorCoordinator are failing. Please look into that.
cc @danny0405

@danny0405
Copy link
Contributor

When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch.

If we do not test it well, can we revert #9545 first, and please do not merge such changes if we have no enough tests especially in the release process.

@nsivabalan
Copy link
Contributor Author

hey @danny0405 : this patch fixes it end to end. I don't see a reason why we need to revert it though. if we don't have a solution yet, I agree we can revert it, but we already have a working solution e2e and has been tested w/ spark retries as well.

@danny0405
Copy link
Contributor

working solution e2e and has been tested w/ spark retries as well.

Okay, I'm just scared for any regression because the changes are very core, I know we already have 2e2 tests manually but still think we should give some buffer time to the community for the testing. Push it into 0.14.0 is not a very sensible choice.

@nsivabalan nsivabalan force-pushed the testMorSpuriousLogs branch 2 times, most recently from 2d333fd to 3e8fb83 Compare September 7, 2023 00:02
@nsivabalan
Copy link
Contributor Author

@hudi-bot run azure

@@ -62,4 +62,9 @@ public Option<String> getProperty(EngineProperty prop) {
return Option.empty();
}

@Override
public Supplier<Integer> getAttemptNumberSupplier() {
return () -> -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Flink we already have getAttemptIdSupplier which serves as the same purpose, what's the usage of Spark then? should we use getAttemptIdSupplier instead ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as of now, I have disabled it for flink. can you test it out and fix it.

here is the situation w/ spark.

Lets say we want to spin up 10 tasks for a stage.
in first attempt,
each task will be assigned numbers from 1 to 10 for attemptId. but for attempNumber, it will be 0.
and when a subset of tasks are retried, new attemptIds will be 11, 12 etc. but attemptNumber will be 1.

This is how it works in spark. I am not very sure on flink. Anyways, w/ latest commit, we are avoiding writing the block identifier flink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in summary, attemptId supplier is not the right one to use atleast in spark. It has to be attempt number.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does spark retry always takes the same data set? Is is possible one retried task goes to other executor/container and takes a different input dataset there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrt log files, it should not, bcoz, only updates go to log files.
incase of bucket Index, anyways, record key to fileId is hash based and we should be good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only updates go to log files.

Only true for spark, so you are fixing a bug dependent on Spark write christeristic.

Copy link
Contributor Author

@nsivabalan nsivabalan Sep 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. we have disabled it for flink as of now. In java, anyways, MOR is not fully functional from what I know. but I am open to disabling it for java as well. mainly its an issue for ExpressionPayload and any other custom payloads. most of the other payloads are idempotent even if there are duplicate log blocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the Flink impl right first by using this.flinkRuntimeContext.getAttemptNumber()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will sync up w/ you directly to better understand this.
https://issues.apache.org/jira/browse/HUDI-6844

@nsivabalan
Copy link
Contributor Author

image

@nsivabalan
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link

hudi-bot commented Sep 9, 2023

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan nsivabalan merged commit dd4c404 into apache:master Sep 10, 2023
28 checks passed
prashantwason pushed a commit that referenced this pull request Sep 13, 2023
…9611)

- We attempted a fix to avoid reading spurious log blocks on the reader side with #9545.
When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch. Tested end to test by simulating spark retries and spurious log blocks. Reader is able to detect them and ignore multiple copies of log blocks.
leosanqing pushed a commit to leosanqing/hudi that referenced this pull request Sep 13, 2023
…pache#9611)

- We attempted a fix to avoid reading spurious log blocks on the reader side with apache#9545.
When I tested the patch end to end, found some gaps. Specifically, the attempt Id we had with taskContextSupplier was not referring to task's attempt number. So, fixing it in this patch. Tested end to test by simulating spark retries and spurious log blocks. Reader is able to detect them and ignore multiple copies of log blocks.
@beyond1920
Copy link
Contributor

beyond1920 commented Mar 24, 2024

@nsivabalan Good job.
We found a minor drawback.
There are 4 cases that the task retry:

  1. Task is slow, another speculation task is retried
  2. The task failed and retry
  3. The stage failed and retry
  4. The executor failed and retry

For the third case, which stage retried. Task attempt number might be back to the original value.
Using attempt number to identify the block is not enough to handle this case. It might leads to wrong result to compare blocks size of each attempt no.

We might need to using stageAttemptNumber and AttemptNumber to identify it, or other solution. WDYT?

@danny0405
Copy link
Contributor

I would rather we revert this change first if there is no thorough solution or put a flag for switching whereas by default as disabled.

@nsivabalan
Copy link
Contributor Author

hey @beyond1920 : thanks for flagging this. I agree w/ danny. will probably revert the change for now.

@KnightChess
Copy link
Contributor

@nsivabalan hi, I have a question. Why can't we use taskAttemptId in Spark? From the results, it seems that in speculative execution, combining instanceTime with taskAttemptId or attemptId can achieve diff the dup blocks. Additionally, taskAttemptId is global, so it can distinguish tasks even in cases of stage retries or executor crashes like @beyond1920 said. Perhaps I'm missing some details, can you help clarify?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants