Skip to content

Commit

Permalink
[fix][broker] Fix heartbeat namespace create event topic and cannot d…
Browse files Browse the repository at this point in the history
…elete heartbeat topic (apache#21360)

Co-authored-by: fanjianye <fanjianye@bigo.sg>
Co-authored-by: Jiwei Guo <technoboy@apache.org>
(cherry picked from commit 700a29d)
  • Loading branch information
TakaHiR07 authored and shibd committed Oct 23, 2023
1 parent d1a87d9 commit 3cd9317
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,23 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {

@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return CompletableFuture.completedFuture(null);
}
return sendTopicPolicyEvent(topicName, ActionType.DELETE, null);
}

@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) {
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return CompletableFuture.failedFuture(new BrokerServiceException.NotAllowedException(
"Not allowed to update topic policy for the heartbeat topic"));
}
return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies);
}

private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
TopicPolicies policies) {
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return CompletableFuture.failedFuture(
new BrokerServiceException.NotAllowedException("Not allowed to send event to health check topic"));
}
return pulsarService.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(namespacePolicies -> {
Expand Down Expand Up @@ -217,6 +220,9 @@ public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesC
@Override
public TopicPolicies getTopicPolicies(TopicName topicName,
boolean isGlobal) throws TopicPoliciesCacheNotInitException {
if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return null;
}
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
NamespaceName namespace = topicName.getNamespaceObject();
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception {
Optional<Topic> optionalTopic = pulsar.getBrokerService()
.getTopic(topicName.getPartition(1).toString(), false).join();
Assert.assertTrue(optionalTopic.isEmpty());

TopicName heartbeatTopicName = TopicName.get("persistent",
namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
admin.topics().getRetention(heartbeatTopicName.toString());
optionalTopic = pulsar.getBrokerService()
.getTopic(topicName.getPartition(1).toString(), false).join();
Assert.assertTrue(optionalTopic.isEmpty());
}

@Test
Expand All @@ -203,6 +210,22 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
});
}

@Test
public void testHeartbeatTopicBeDeleted() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
pulsar.getConfig());
TopicName heartbeatTopicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);

List<String> topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
Assert.assertEquals(topics.size(), 1);
Assert.assertEquals(topics.get(0), heartbeatTopicName.toString());

admin.topics().delete(heartbeatTopicName.toString(), true);
topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
Assert.assertEquals(topics.size(), 0);
}

@Test
public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
Expand Down

0 comments on commit 3cd9317

Please sign in to comment.