-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction] Fix transaction buffer client handle endTxn op when topic or sub have been deleted. #11304
[Transaction] Fix transaction buffer client handle endTxn op when topic or sub have been deleted. #11304
Conversation
admin.topics().createSubscription(topic + "_commit_sub", subName, MessageId.earliest); | ||
|
||
|
||
Awaitility.await().until(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have to retry this operation (and the commit
below) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitting pending ack init.
@@ -1930,7 +1930,7 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) { | |||
log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", topic, | |||
txnID, txnAction); | |||
} | |||
CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString()); | |||
CompletableFuture<Optional<Topic>> topicFuture = service.getTopic(topic, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users can disable the topic auto-creation, this means they want to prevent the topic auto-creation. After this change, the TC might re-create a topic even if the topic has been deleted definitely. This will break the behavior of topic auto-creation disabled.
I think the current fix is:
- If the topic has been deleted, we should skip all the TC logs of this topic.
- If the topic has not been deleted but only not loaded by any broker, we can load the topic and complete the transaction.
WDYT @congbobo184
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184 I am cutting 2.8.1, would you please address this comment?
Hi @eolivelli @codelipenghui please review again. thanks:) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not still very comfortable with those wait loops.
Can you please check the latest utility methods we added in order to wait for the TC to be ready?
If we do not have some useful utility, can you please add a new utility method?
admin.topics().createNonPartitionedTopic(topic + "_commit_sub"); | ||
admin.topics().createSubscription(topic + "_commit_sub", subName, MessageId.earliest); | ||
|
||
Awaitility.await().until(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some utility method to wait for the coordinator to be started.
Can we use it?
It is not clear why we need this wait loop.
It may hide many other internal errors.
This is a test case and we should have full control over what's going on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adopt your comment, thank for your comments. please review again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
…op_init_topic # Conflicts: # pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
* an instance of {@link Boolean} is returned | ||
* if the operation succeeds. | ||
*/ | ||
CompletableFuture<Boolean> checkManagedLedgerInitializedBefore(String name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletableFuture<Boolean> checkManagedLedgerInitializedBefore(String name); | |
CompletableFuture<Boolean> asyncExists(String name); |
* an instance of {@link Boolean} is returned | ||
* if the operation succeeds. | ||
*/ | ||
CompletableFuture<Boolean> exists(String path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletableFuture<Boolean> exists(String path); | |
CompletableFuture<Boolean> asyncExists(String ledgerName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall LGTM
I left some suggestions if you want to make some improvements
txnID, TxnAction.valueOf(txnAction)); | ||
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, | ||
ServerError.ServiceNotReady, | ||
"The topic " + topic + " is not exist in broker.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a typo, what about: "The topic " + topic + " does not exist in broker."
"The topic " + topic + " is not exist in broker.", | ||
txnID.getMostSigBits(), txnID.getLeastSigBits())); | ||
} else { | ||
log.warn("handleEndTxnOnPartition fail ! The topic {} have not been created, " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "has not been created"
"Topic " + optionalTopic.get().getName() | ||
+ " subscription " + subName + " is not exist.")); | ||
log.warn("handleEndTxnOnSubscription fail! " | ||
+ "topic {} subscription {} is not exist. txnId: [{}], txnAction: [{}]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"is not exist" -> "does not exist"
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), | ||
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); | ||
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); | ||
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need 16 partitions for this test ?
is there an utility method to setup the system topics related to Transactions ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's good comment, we don't need to create tc topic in this test.
…ic or sub have been deleted. (#11304) now, when topic have been deleted we will not handle the `endTxnOnTopic` and endTxnOnSub. now, when sub have been deleted we will not handle the `endTxnOnSub`. when topic or sub have been deleted, we should return success to tc on `endTxnOnTopic` and endTxnOnSub operation. when topic not exist in this broker, we should judge the topic if it have been created. if not created, we should return success. (cherry picked from commit a6e66dd)
…ic or sub have been deleted. (apache#11304) ## Motivation now, when topic have been deleted we will not handle the `endTxnOnTopic` and endTxnOnSub. now, when sub have been deleted we will not handle the `endTxnOnSub`. when topic or sub have been deleted, we should return success to tc on `endTxnOnTopic` and endTxnOnSub operation. ## implement when topic not exist in this broker, we should judge the topic if it have been created. if not created, we should return success.
Motivation
now, when topic have been deleted we will not handle the
endTxnOnTopic
and endTxnOnSub.now, when sub have been deleted we will not handle the
endTxnOnSub
.when topic or sub have been deleted, we should return success to tc on
endTxnOnTopic
and endTxnOnSub operation.implement
when topic not exist in this broker, we should judge the topic if it have been created. if not created, we should return success.
Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)