Skip to content

Commit

Permalink
[fix][broker] Fix can not delete namespace by force (#18307)
Browse files Browse the repository at this point in the history
  • Loading branch information
liangyepianzhou authored Nov 21, 2022
1 parent 47981c9 commit 6c9ff8f
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,38 +218,47 @@ protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
}))
.thenCompose(topics -> {
List<String> allTopics = topics.get(0);
ArrayList<String> allUserCreatedTopics = new ArrayList<>();
List<String> allPartitionedTopics = topics.get(1);
if (!force) {
boolean hasNonSystemTopic = false;
for (String topic : allTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
break;
}
ArrayList<String> allUserCreatedPartitionTopics = new ArrayList<>();
boolean hasNonSystemTopic = false;
List<String> allSystemTopics = new ArrayList<>();
List<String> allPartitionedSystemTopics = new ArrayList<>();
for (String topic : allTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedTopics.add(topic);
} else {
allSystemTopics.add(topic);
}
if (!hasNonSystemTopic) {
for (String topic : allPartitionedTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
break;
}
}
}
for (String topic : allPartitionedTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedPartitionTopics.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
}

}
if (!force) {
if (hasNonSystemTopic) {
throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace");
}
}
return namespaceResources().setPoliciesAsync(namespaceName, old -> {
old.deleted = true;
return old;
}).thenCompose(__ -> {
return internalDeleteTopicsAsync(allTopics);
}).thenCompose(__ -> {
return internalDeletePartitionedTopicsAsync(allPartitionedTopics);
}).thenCompose(ignore -> {
return internalDeleteTopicsAsync(allUserCreatedTopics);
}).thenCompose(ignore -> {
return internalDeletePartitionedTopicsAsync(allUserCreatedPartitionTopics);
}).thenCompose(ignore -> {
return internalDeleteTopicsAsync(allSystemTopics);
}).thenCompose(ignore__ -> {
return internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics);
});
})
.thenCompose(__ -> pulsar().getNamespaceService()
.thenCompose(ignore -> pulsar().getNamespaceService()
.getNamespaceBundleFactory().getBundlesAsync(namespaceName))
.thenCompose(bundles -> FutureUtil.waitForAll(bundles.getBundles().stream()
.map(bundle -> pulsar().getNamespaceService().getOwnerAsync(bundle)
Expand All @@ -271,7 +280,7 @@ protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
return CompletableFuture.completedFuture(null);
})
).collect(Collectors.toList())))
.thenCompose(__ -> internalClearZkSources());
.thenCompose(ignore -> internalClearZkSources());
}

private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(List<String> topicNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1219,17 +1219,24 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);

deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
.thenCompose(ignore -> deleteTopicPolicies())
.thenCompose(ignore -> transactionBufferCleanupAndClose())
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));
deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
.thenCompose(ignore -> {
if (!this.getBrokerService().getPulsar().getBrokerService()
.isSystemTopic(TopicName.get(topic))) {
return deleteTopicPolicies();
} else {
return CompletableFuture.completedFuture(null);
}
})
.thenCompose(ignore -> transactionBufferCleanupAndClose())
.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));

FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
Expand All @@ -92,6 +93,8 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
Expand Down Expand Up @@ -119,6 +122,7 @@
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -2018,4 +2022,66 @@ private void cleanupNamespaceByNsCollection(Collection<String> namespaces)
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(forceDeleteNamespaceAllowedOriginalValue);
}

@Test
public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws Exception {
String namespace = this.testTenant + "/delete-namespace";
String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-namespace",
"testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();

// 0. enable topic level polices and system topic
pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
pulsar.getConfig().setSystemTopicEnabled(true);
pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
Field policesService = pulsar.getClass().getDeclaredField("topicPoliciesService");
policesService.setAccessible(true);
policesService.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));

// 1. create a test namespace.
admin.namespaces().createNamespace(namespace);
// 2. create a test topic.
admin.topics().createNonPartitionedTopic(topic);
// 3. change policy of the topic.
admin.topicPolicies().setMaxConsumers(topic, 5);
// 4. change the order of the topics in this namespace.
List<String> topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
Assert.assertTrue(topics.size() >= 2);
for (int i = 0; i < topics.size(); i++) {
if (topics.get(i).contains(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
String systemTopic = topics.get(i);
topics.set(i, topics.get(0));
topics.set(0, systemTopic);
}
}
NamespaceService mockNamespaceService = spy(pulsar.getNamespaceService());
Field namespaceServiceField = pulsar.getClass().getDeclaredField("nsService");
namespaceServiceField.setAccessible(true);
namespaceServiceField.set(pulsar, mockNamespaceService);
doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
// 5. delete the namespace
admin.namespaces().deleteNamespace(namespace, true);
// cleanup
resetBroker();
}

@Test
public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
String namespace = this.testTenant + "/delete-systemTopic";
String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
"testNotClearTopicPolicesWhenDeleteSystemTopic").toString();

// 0. enable topic level polices and system topic
pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
pulsar.getConfig().setSystemTopicEnabled(true);
Field policesService = pulsar.getClass().getDeclaredField("topicPoliciesService");
policesService.setAccessible(true);
policesService.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
// 1. create a test namespace.
admin.namespaces().createNamespace(namespace);
// 2. create a test topic.
admin.topics().createNonPartitionedTopic(topic);
// 3. change policy of the topic.
admin.topicPolicies().setMaxConsumers(topic, 5);
// 4. delete the policies topic and the topic wil not to clear topic polices
admin.topics().delete(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,35 +89,6 @@ public void produceAndCommitTest() throws Exception {
produceTest(true);
}

@Test
public void testDeleteNamespaceBeforeCommit() throws Exception {
final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
PulsarClient pulsarClient = this.pulsarClient;
Transaction tnx = pulsarClient.newTransaction()
.withTransactionTimeout(60, TimeUnit.SECONDS)
.build().get();
long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
Assert.assertTrue(txnIdMostBits > -1);
Assert.assertTrue(txnIdLeastBits > -1);

@Cleanup
Producer<byte[]> outProducer = pulsarClient
.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();

String content = "Hello Txn";
outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();

try {
deleteNamespaceGraceFully(NAMESPACE1, true);
} catch (Exception ignore) {}
tnx.commit().get();
}

@Test
public void produceAndAbortTest() throws Exception {
produceTest(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ public void testTopicTransactionMetrics() throws Exception {
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();

admin.namespaces().deleteNamespace(NAMESPACE1, true);
admin.namespaces().createNamespace(NAMESPACE1);
try {
// init pending ack
@Cleanup
Expand All @@ -231,7 +232,7 @@ public void testCreateTransactionSystemTopic() throws Exception {

// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
assertEquals(list.size(), 2);
assertFalse(list.isEmpty());
list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));

try {
Expand Down

0 comments on commit 6c9ff8f

Please sign in to comment.