Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10358][Sort] Make pulsar source support report audit information exactly once #10511

Merged
merged 2 commits into from
Jul 1, 2024

Conversation

XiaoYou201
Copy link
Contributor

[INLONG-10358][Sort] Make pulsar source support report audit information exactly once

Fixes #10358

Modifications

  1. Using the checkpoint principle in Flink to modify the process, the modified flow chart is shown in Figure 1 and Figure 2. In Figure 1, the callback method of notifyCompleteCheckpoint is used to upload audit information instead of scheduled upload.Each Source/Sink will save the checkpointId of the currently ongoing checkpoint, which is nowCheckpointId in the figure. When the audit information is written to the Buffer, nowCheckpointId will be attached, indicating that the audit information
    is written during this ongoing checkpoint. The audit information and checkpointId are in a many-to-one relationship.

image
When a snapshot request is received, the current operator's nowCheckpointId is updated to (snapshot) checkpointId + 1. When all operators in a task complete the snapshot, the notifyCompleteCheckpoint method is called back.

At this time, AuditBuffer uploads audit information less than or equal to checkpointId (parameters in the notifyCompleteCheckpoint method).

image

  1. The getCurConsumedPartitions method gets the partitions assigned to the client by the tube server, including the partitions where the client has consumed data and the client has not consumed data. According to the previous logic, the offsets of the partitions that have not been consumed are not recorded. Here, the offsets of the partitions that have not been consumed are added.

@aloyszhang
Copy link
Contributor

What's the difference between PulsarSource.java, PulsarSourceReaderBase, and PulsarSourceBuilder and the original classed in flink-pulsar-connector?

@XiaoYou201
Copy link
Contributor Author

XiaoYou201 commented Jun 26, 2024

What's the difference between PulsarSource.java, PulsarSourceReaderBase, and PulsarSourceBuilder and the original classed in flink-pulsar-connector?

There are Almost no difference between the above three classes and original classes, but I modify PulsarReader, and The PulsarTableSource create PulsarSource Through PulsarSource Builder, PulsarSource create PulsarReader. So, if I want to use custom PulsarReader, I have to modify the method which create PulsarReader in PulsarSource, and modify PulsarSourceBuilder method to build PulsarSource which can create custom PulsarReader.As for PulsarSourceReaderBase, it not public, So, I have to copy it out.

@aloyszhang
Copy link
Contributor

aloyszhang commented Jun 26, 2024

I think you mean PulsarSourceReaderBase is not public, not PulsarSourceBuilder .

@XiaoYou201
Copy link
Contributor Author

I think you mean PulsarSourceReaderBase is not public, not PulsarSourceReaderBase .

yeah~, I correct my description , thx for your suggestion and review~~

@EMsnap
Copy link
Contributor

EMsnap commented Jun 27, 2024

Please test on the situation where the job start from the early checkpoint with the former version of Pulsar connector

@EMsnap EMsnap merged commit ada439c into apache:master Jul 1, 2024
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Sort] Pulsar source support report audit information exactly once
3 participants