-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Donot create transaction components for function work topic #11543
Donot create transaction components for function work topic #11543
Conversation
try { | ||
workerConfig = WorkerConfig.load(fnWorkerConfigFile); | ||
} catch (IOException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please handle the exception not print it
Assert.isNonEmpty(workerConfig); | ||
Assert.isNonEmpty(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to handle the returned value?
@@ -300,6 +305,22 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS | |||
} | |||
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); | |||
} | |||
private boolean checkTopicIsFunctionWorkerService(PersistentTopic topic){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better find a common place to avoid the method duplicate
this.pendingAckHandle = new PendingAckHandleImpl(this); | ||
} else { | ||
this.pendingAckHandle = new PendingAckHandleDisabled(); | ||
} | ||
IS_FENCED_UPDATER.set(this, FALSE); | ||
} | ||
private boolean checkTopicIsFunctionWorkerService(PersistentTopic topic){ | ||
String fnWorkerConfigFile = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function work config file may be in a different place, user can define it with environment variable
/pulsarbot run-failure-checks |
WorkerConfig workerConfig = topic.getBrokerService().getPulsar().getWorkerConfig().get(); | ||
String topicName = topic.getName(); | ||
return workerConfig.getClusterCoordinationTopic().equals(topicName) | ||
|| workerConfig.getFunctionAssignmentTopic().equals(topicName) | ||
|| workerConfig.getFunctionMetadataTopic().equals(topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can only work for the broker run with the function worker? If the function working running independently, here will get an incorrect worker config.
I think we already have a PR to support transaction buffer and pending ack lazy creation, if new transactions happens on a topic, the transaction buffer and the pending ack will not be created?
Fixes #11481
Motivation
Modifications
Verifying this change
Add
TransactionTest::testFilterFunctionTopicForTransactionComponent
to verifyDoes this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
For this PR, we don't need to update docs.