Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix deleting topic not delete the related topic policy and schema. #21093

Merged
merged 4 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -674,21 +673,7 @@ private boolean allowAutoUpdateSchema() {

@Override
public CompletableFuture<SchemaVersion> deleteSchema() {
String id = getSchemaId();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
.thenCompose(schema -> {
if (schema != null) {
// It's different from `SchemasResource.deleteSchema`
// because when we delete a topic, the schema
// history is meaningless. But when we delete a schema of a topic, a new schema could be
// registered in the future.
log.info("Delete schema storage of id: {}", id);
return schemaRegistryService.deleteSchemaStorage(id);
} else {
return CompletableFuture.completedFuture(null);
}
});
return brokerService.deleteSchema(TopicName.get(getName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -159,6 +161,7 @@
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -1156,26 +1159,33 @@ private CompletableFuture<Void> deleteTopicInternal(String topic, boolean forceD
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);

deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
deleteTopicAuthenticationFuture
.thenCompose(__ -> deleteSchema(tn))
.thenCompose(__ -> {
if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
&& getPulsar().getConfiguration().isSystemTopicEnabled()) {
return deleteTopicPolicies(tn);
}
return CompletableFuture.completedFuture(null);
}).whenComplete((v, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
return;
}
CompletableFuture<ManagedLedgerConfig> mlConfigFuture = getManagedLedgerConfig(topicName);
managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(),
mlConfigFuture, new DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
future.complete(null);
}
mlConfigFuture, new DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
future.complete(null);
}

@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});
@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});

return future;
}
Expand Down Expand Up @@ -3451,6 +3461,25 @@ public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
}

CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
String base = topicName.getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = getPulsar().getSchemaRegistryService();
return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
.thenCompose(schema -> {
if (schema != null) {
// It's different from `SchemasResource.deleteSchema`
// because when we delete a topic, the schema
// history is meaningless. But when we delete a schema of a topic, a new schema could be
// registered in the future.
log.info("Delete schema storage of id: {}", id);
return getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
} else {
return CompletableFuture.completedFuture(null);
}
});
}

private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
return pulsar.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.util.Map.Entry;
import java.util.NavigableMap;
Expand All @@ -31,10 +31,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Sets;
import lombok.Cleanup;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand All @@ -47,6 +45,7 @@
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
Expand Down Expand Up @@ -497,10 +496,31 @@ public void testDeleteTopicWithMissingData() throws Exception {
// Expected
}

// Deletion must succeed
admin.topics().delete(topic);
assertThrows(PulsarAdminException.ServerSideErrorException.class, () -> admin.topics().delete(topic));
}

@Test
public void testDeleteTopicWithoutTopicLoaded() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("prop/usc");
admin.namespaces().createNamespace(namespace);

String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");

@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.build();

// Topic will not be there after
@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.create();

producer.close();
admin.topics().unload(topic);

admin.topics().delete(topic);
assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(), Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.systopic;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -37,6 +39,7 @@
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -55,6 +58,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -299,4 +303,25 @@ public void testSystemTopicNotCheckExceed() throws Exception {
writer1.get().close();
writer2.get().close();
}

@Test
public void testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded() throws Exception {
final String ns = "prop/ns-test";
admin.namespaces().createNamespace(ns, 2);
final String topicName = "persistent://prop/ns-test/testDeleteTopicSchemaAndPolicyWhenTopicIsNotLoaded";
admin.topics().createNonPartitionedTopic(topicName);
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
admin.topicPolicies().setMaxConsumers(topicName, 2);
Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getMaxConsumers(topicName), 2));
CompletableFuture<Optional<Topic>> topic = pulsar.getBrokerService().getTopic(topicName, false);
PersistentTopic persistentTopic = (PersistentTopic) topic.join().get();
persistentTopic.close();
admin.topics().delete(topicName);
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPoliciesIfExists(TopicName.get(topicName));
assertNull(topicPolicies);
String base = TopicName.get(topicName).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
CompletableFuture<SchemaRegistry.SchemaAndMetadata> schema = pulsar.getSchemaRegistryService().getSchema(id);
assertNull(schema.join());
}
}