Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Txn] Support for commit idempotency. #20356

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

thetumbled
Copy link
Member

@thetumbled thetumbled commented May 19, 2023

PIP:
#19744

Motivation

  • Message queues such as Kafka and Pulsar can only guarantee the exactly-once semantics provided by the transaction feature under the specific use scenario of consume-transform-produce pattern, that is, a transaction contains both production and consumption.
    The operations in the transaction include the production on the sink side and the offset submission on the source side.
    Using the atomicity of the transaction, these two operations are either completed at the same time or not completed at the same time. It does not need to worry about whether the transaction is committed successfully, because regardless of whether it is successful or not, the end-to-end state is consistent before and after. Therefore, transaction feature implemented by Kafka and Pulsar only support commit or abort once, and it is illegal to repeatedly submit commit or abort requests afterwards, that is, they do not support the idempotence of commit operations.

  • But in many other use cases, which is different from consume-transform-produce pattern, we need to know the accurate state of the transaction after the commit operation is submitted. For example,

    • In the case of produce-only, the transaction only contains the production operation, and the offset submission operation is not included, which is simillar to RocketMQ.

    • The exactly-once semantics guaranteed by Flink is based on the Two-Phase Commit protocol implemented by Flink itself. When connecting to an external system, Flink has requirements for external system to ensure the exact once semantics:

      1. Provide transaction functionality
      2. The transaction commit operation should ensure idempotence.

      The details can be found in the following link:
      https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka

  • Though Kafka do not support for idempotence of commit operations, but Flink-Kafka-Connector do some tricks to achieve the idempotence of commit operations for the last transaction, so that Flink+Kafka can guarantee the exactly-once semantics in most of the cases, but still with some risks.

  • But for Pulsar, it is impossible to achieve any idempotence of commit operations currently, because the implementation of transaction in pulsar is quite different from kafka. I have post a blog to analyze the difference between Pulsar and Kafka. https://blog.csdn.net/m0_43406494/article/details/130344399

Modifications

  • Provide the idempotence of commit operations for the transaction in Pulsar, which is disabled by default.
    • We will introduce a TransactionMetadataPreserver to store the terminated transaction metadata which is a component of TC.
    • Once we catch the TrsansactionNotFound exception, we will query the TransactionMetadataPreserver to know the state of the transaction.
    • Client will attach the clientName to the transaction, and TransactionMetadataPreserver will preserve TransactionMetaPersistCount number of transaction metadata for each client.

API Changes

  • wire protocol change
message CommandNewTxn {
required uint64 request_id = 1;
optional uint64 txn_ttl_seconds = 2 [default = 0];
optional uint64 tc_id = 3 [default = 0];
**optional string client_name = 4;**
}

message CommandEndTxn {
required uint64 request_id = 1;
optional uint64 txnid_least_bits = 2 [default = 0];
optional uint64 txnid_most_bits = 3 [default = 0];
optional TxnAction txn_action = 4;
**optional string client_name = 5;**
}

message TransactionMetadataEntry {
...
**optional string clientName = 13;**
}

enum ServerError {
...
**TransactionPreserverClosed = 26; // Transaction metadata preserver is closed**
}
  • client configuration change
   @ApiModelProperty(
            name = "clientName",
            value = "Client name that is used to save transaction metadata."
    )
    private String clientName;
  • broker configuration change
    @FieldContext(
            category = CATEGORY_TRANSACTION,
            doc = "Max number of txnMeta of aborted transaction to persist in each TC."
            + "If the number of terminated transaction is greater than this value, the oldest terminated transaction will be "
            + "removed from the cache and persisted in the store."
            + "default value is 0, disable persistence of terminated transaction."
    )
    private int TransactionMetaPersistCount = 0;

    @FieldContext(
            category = CATEGORY_TRANSACTION,
            doc = "Time in hour to persist the transaction metadata in TransactionMetadataPreserver."
    )
    private long TransactionMetaPersistTimeInHour = 72;

    @FieldContext(
            category = CATEGORY_TRANSACTION,
            doc = "Interval in seconds to check the expired transaction metadata in TransactionMetadataPreserver."
    )
    private long TransactionMetaExpireCheckIntervalInSecond = 300;

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: thetumbled#21

@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label May 19, 2023
@mattisonchao
Copy link
Member

Please keep this PR separate so that it can help committers review it.
thanks :)

@thetumbled
Copy link
Member Author

thetumbled commented May 19, 2023

Please keep this PR separate so that it can help committers review it. thanks :)

fixed, this is the solely PR related to PIP-255.
#19744

@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Jun 30, 2023
@Technoboy- Technoboy- modified the milestones: 3.1.0, 3.2.0 Jul 31, 2023
@Technoboy- Technoboy- modified the milestones: 3.2.0, 3.3.0 Dec 22, 2023
@coderzc coderzc modified the milestones: 3.3.0, 3.4.0 May 8, 2024
@lhotari lhotari removed this from the 4.0.0 milestone Oct 14, 2024
@lhotari lhotari added this to the 4.1.0 milestone Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/transaction doc-required Your PR changes impact docs and you will update later. Stale
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants