From 866a4ab29c30490592bea9ec63f1955417f8fb79 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 30 Aug 2023 18:22:51 +0800 Subject: [PATCH 1/4] Fix deleting topic not delete the related topic policy and schema. --- .../pulsar/broker/service/AbstractTopic.java | 16 +----- .../pulsar/broker/service/BrokerService.java | 54 ++++++++++++++----- .../systopic/PartitionedSystemTopicTest.java | 25 +++++++++ 3 files changed, 67 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index b15f8cbf0b848..78df7ae5dc579 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -674,21 +674,7 @@ private boolean allowAutoUpdateSchema() { @Override public CompletableFuture deleteSchema() { - String id = getSchemaId(); - SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); - return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id)) - .thenCompose(schema -> { - if (schema != null) { - // It's different from `SchemasResource.deleteSchema` - // because when we delete a topic, the schema - // history is meaningless. But when we delete a schema of a topic, a new schema could be - // registered in the future. - log.info("Delete schema storage of id: {}", id); - return schemaRegistryService.deleteSchemaStorage(id); - } else { - return CompletableFuture.completedFuture(null); - } - }); + return brokerService.deleteSchema(TopicName.get(getName())); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 199901b090251..e2940c273d506 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -119,6 +119,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; +import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; +import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; @@ -159,6 +161,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FieldParser; import org.apache.pulsar.common.util.FutureUtil; @@ -1156,26 +1159,32 @@ private CompletableFuture deleteTopicInternal(String topic, boolean forceD CompletableFuture future = new CompletableFuture<>(); CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); - - deleteTopicAuthenticationFuture.whenComplete((v, ex) -> { + deleteTopicAuthenticationFuture + .thenCompose(__ -> deleteSchema(tn)) + .thenCompose(__ -> { + if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { + return deleteTopicPolicies(tn); + } + return CompletableFuture.completedFuture(null); + }).whenComplete((v, ex) -> { if (ex != null) { future.completeExceptionally(ex); return; } CompletableFuture mlConfigFuture = getManagedLedgerConfig(topicName); managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), - mlConfigFuture, new DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - future.complete(null); - } + mlConfigFuture, new DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + future.complete(null); + } - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null); - }); + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + }); return future; } @@ -3451,6 +3460,25 @@ public CompletableFuture deleteTopicPolicies(TopicName topicName) { .deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName())); } + CompletableFuture deleteSchema(TopicName topicName) { + String base = topicName.getPartitionedTopicName(); + String id = TopicName.get(base).getSchemaName(); + SchemaRegistryService schemaRegistryService = getPulsar().getSchemaRegistryService(); + return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id)) + .thenCompose(schema -> { + if (schema != null) { + // It's different from `SchemasResource.deleteSchema` + // because when we delete a topic, the schema + // history is meaningless. But when we delete a schema of a topic, a new schema could be + // registered in the future. + log.info("Delete schema storage of id: {}", id); + return getPulsar().getSchemaRegistryService().deleteSchemaStorage(id); + } else { + return CompletableFuture.completedFuture(null); + } + }); + } + private CompletableFuture checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) { return pulsar.getPulsarResources().getNamespaceResources() .getPoliciesAsync(topicName.getNamespaceObject()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 008c2143a3566..34e7dfe92e5d7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.systopic; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; import com.google.common.collect.Sets; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -37,6 +39,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.schema.SchemaRegistry; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -55,6 +58,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; @@ -299,4 +303,25 @@ public void testSystemTopicNotCheckExceed() throws Exception { writer1.get().close(); writer2.get().close(); } + + @Test + public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws Exception { + final String ns = "prop/ns-test"; + admin.namespaces().createNamespace(ns, 2); + final String topicName = "persistent://prop/ns-test/testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded"; + admin.topics().createNonPartitionedTopic(topicName); + pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close(); + admin.topicPolicies().setMaxConsumers(topicName, 2); + Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 2)); + CompletableFuture> topic = pulsar.getBrokerService().getTopic(topicName, false); + PersistentTopic persistentTopic = (PersistentTopic) topic.join().get(); + persistentTopic.close(); + admin.topics().delete(topicName); + TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName)); + assertNull(topicPolicies); + String base = TopicName.get(topicName).getPartitionedTopicName(); + String id = TopicName.get(base).getSchemaName(); + CompletableFuture schema = pulsar.getSchemaRegistryService().getSchema(id); + assertNull(schema.join()); + } } From a2f3403592c3b8f3601703af605f62cb131d86f6 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 30 Aug 2023 20:06:25 +0800 Subject: [PATCH 2/4] fix checkstyle --- .../java/org/apache/pulsar/broker/service/AbstractTopic.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 78df7ae5dc579..cef2dd2080cf0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -58,7 +58,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; import org.apache.pulsar.broker.service.plugin.EntryFilter; -import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; From 48ff1241cef6d60f1594348d97e6d05d631091b6 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 30 Aug 2023 21:48:10 +0800 Subject: [PATCH 3/4] fix --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e2940c273d506..391affef1da9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1162,7 +1162,8 @@ private CompletableFuture deleteTopicInternal(String topic, boolean forceD deleteTopicAuthenticationFuture .thenCompose(__ -> deleteSchema(tn)) .thenCompose(__ -> { - if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)) { + if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic) + && getPulsar().getConfiguration().isSystemTopicEnabled()) { return deleteTopicPolicies(tn); } return CompletableFuture.completedFuture(null); From d23edc0c5482e14528ce4d437a4abcbf04df6b82 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 31 Aug 2023 14:55:54 +0800 Subject: [PATCH 4/4] Fix test --- .../service/BrokerBkEnsemblesTests.java | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 9f19bda3647f3..40649a4164047 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -21,8 +21,8 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.fail; - import java.lang.reflect.Field; import java.util.Map.Entry; import java.util.NavigableMap; @@ -31,10 +31,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - import com.google.common.collect.Sets; import lombok.Cleanup; - import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -47,6 +45,7 @@ import org.apache.bookkeeper.util.StringUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -497,10 +496,31 @@ public void testDeleteTopicWithMissingData() throws Exception { // Expected } - // Deletion must succeed - admin.topics().delete(topic); + assertThrows(PulsarAdminException.ServerSideErrorException.class, () -> admin.topics().delete(topic)); + } + + @Test + public void testDeleteTopicWithoutTopicLoaded() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("prop/usc"); + admin.namespaces().createNamespace(namespace); + + String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic"); + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); - // Topic will not be there after + @Cleanup + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + producer.close(); + admin.topics().unload(topic); + + admin.topics().delete(topic); assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(), Optional.empty()); }