Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10311][Sort] Implement TubeMQ Source report audit information exactly once And fix consuming TubeMQ data twice #10440

Merged
merged 10 commits into from
Jun 19, 2024

Conversation

XiaoYou201
Copy link
Contributor

@XiaoYou201 XiaoYou201 commented Jun 18, 2024

[INLONG-10311][Sort] Implement TubeMQ Source report audit information exactly once

Fixes #10311

Motivation

Now,every connectors report audit information only support at least once. So,This pr base it to implements report audit information exactly once. The audit information will not be wrong in much exception situation.

Modifications

  1. Using the checkpoint principle in Flink to modify the process, the modified flow chart is shown in Figure 1 and Figure 2. In Figure 1, the callback method of notifyCompleteCheckpoint is used to upload audit information instead of scheduled upload.

Each Source/Sink will save the checkpointId of the currently ongoing checkpoint, which is nowCheckpointId in the figure. When the audit information is written to the Buffer, nowCheckpointId will be attached, indicating that the audit information
is written during this ongoing checkpoint. The audit information and checkpointId are in a many-to-one relationship.

image
When a snapshot request is received, the current operator's nowCheckpointId is updated to (snapshot) checkpointId + 1. When all operators in a task complete the snapshot, the notifyCompleteCheckpoint method is called back.

At this time, AuditBuffer uploads audit information less than or equal to checkpointId (parameters in the notifyCompleteCheckpoint method).

image

  1. The getCurConsumedPartitions method gets the partitions assigned to the client by the tube server, including the partitions where the client has consumed data and t the client has not consumed data. According to the previous logic, the offsets of the partitions that have not been consumed are not recorded. Here, the offsets of the partitions that have not been consumed are added.

@XiaoYou201 XiaoYou201 changed the title [INLONG-10311][Sort] Implement TubeMQ Source report audit information exactly once [INLONG-10311][Sort] Implement TubeMQ Source report audit information exactly once And fix TubeMQ data twice. Jun 18, 2024
@XiaoYou201 XiaoYou201 changed the title [INLONG-10311][Sort] Implement TubeMQ Source report audit information exactly once And fix TubeMQ data twice. [INLONG-10311][Sort] Implement TubeMQ Source report audit information exactly once And fix TubeMQ data twice Jun 18, 2024
@XiaoYou201 XiaoYou201 requested a review from EMsnap June 18, 2024 12:42
EMsnap
EMsnap previously approved these changes Jun 19, 2024
@EMsnap EMsnap self-requested a review June 19, 2024 01:43
@EMsnap EMsnap dismissed their stale review June 19, 2024 01:44

resolved comment still not solved

@XiaoYou201 XiaoYou201 changed the title [INLONG-10311][Sort] Implement TubeMQ Source report audit information exactly once And fix TubeMQ data twice [INLONG-10311][Sort] Implement TubeMQ Source report audit information exactly once And fix consuming TubeMQ data twice Jun 19, 2024
@dockerzhang dockerzhang merged commit 5c6aaee into apache:master Jun 19, 2024
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Sort] TubeMQ source support report audit information exactly once
5 participants