From 6724181a2196186bba2f35aa5ba1ad6e5b58f9c5 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 23 Aug 2022 15:49:16 -0700 Subject: [PATCH 1/3] fix: Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. --- .../broker/resources/NamespaceResources.java | 70 +++++++ .../broker/resources/TopicResources.java | 2 + .../broker/admin/impl/NamespacesBase.java | 6 +- .../admin/impl/PersistentTopicsBase.java | 152 +++++++------- .../pulsar/broker/lookup/TopicLookupBase.java | 9 +- .../broker/namespace/NamespaceService.java | 10 +- .../pulsar/broker/service/BrokerService.java | 86 +++++--- .../pulsar/broker/service/ServerCnx.java | 8 +- .../nonpersistent/NonPersistentTopic.java | 4 +- .../service/persistent/PersistentTopic.java | 197 +++++++++--------- .../admin/AdminApiMultiBrokersTest.java | 87 ++++++++ .../pulsar/broker/admin/NamespacesTest.java | 1 + .../lookup/http/HttpTopicLookupv2Test.java | 2 + .../service/BacklogQuotaManagerTest.java | 18 +- .../pulsar/broker/service/BrokerTestBase.java | 3 +- .../broker/service/ExclusiveProducerTest.java | 2 - .../broker/service/PersistentTopicTest.java | 9 +- .../client/impl/TopicsConsumerImplTest.java | 28 --- .../partition/PartitionedTopicMetadata.java | 1 + .../integration/topics/TestTopicDeletion.java | 183 ++++++++++++++++ 20 files changed, 628 insertions(+), 250 deletions(-) create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 04238888f0753..5a7c22c284d97 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; import org.apache.pulsar.common.naming.NamespaceName; @@ -295,6 +296,75 @@ public CompletableFuture clearPartitionedTopicTenantAsync(String tenant) { final String partitionedTopicPath = joinPath(PARTITIONED_TOPIC_PATH, tenant); return deleteIfExistsAsync(partitionedTopicPath); } + + public CompletableFuture markPartitionedTopicDeletedAsync(TopicName tn) { + if (tn.isPartitioned()) { + return CompletableFuture.completedFuture(null); + } + log.debug("markPartitionedTopicDeletedAsync {}", tn); + return updatePartitionedTopicAsync(tn, md -> { + md.deleted = true; + return md; + }); + } + + public CompletableFuture unmarkPartitionedTopicDeletedAsync(TopicName tn) { + if (tn.isPartitioned()) { + return CompletableFuture.completedFuture(null); + } + log.debug("unmarkPartitionedTopicDeletedAsync {}", tn); + return updatePartitionedTopicAsync(tn, md -> { + md.deleted = false; + return md; + }); + } + + public CompletableFuture isPartitionedTopicBeingDeletedAsync(TopicName tn) { + if (tn.isPartitioned()) { + tn = TopicName.get(tn.getPartitionedTopicName()); + } + return getPartitionedTopicMetadataAsync(tn) + .thenApply(mdOpt -> mdOpt.map(partitionedTopicMetadata -> partitionedTopicMetadata.deleted) + .orElse(false)); + } + + public CompletableFuture runWithMarkDeleteAsync(TopicName topic, + Supplier> supplier) { + CompletableFuture future = new CompletableFuture<>(); + + markPartitionedTopicDeletedAsync(topic).whenCompleteAsync((markResult, markExc) -> { + final boolean mdFound; + if (markExc != null) { + if (markExc.getCause() instanceof MetadataStoreException.NotFoundException) { + mdFound = false; + } else { + log.error("Failed to mark the topic {} as deleted", topic, markExc); + future.completeExceptionally(markExc); + return; + } + } else { + mdFound = true; + } + + supplier.get().whenComplete((deleteResult, deleteExc) -> { + if (deleteExc != null && mdFound) { + unmarkPartitionedTopicDeletedAsync(topic) + .thenRun(() -> future.completeExceptionally(deleteExc)) + .exceptionally(ex -> { + log.warn("Failed to unmark the topic {} as deleted", topic, ex); + future.completeExceptionally(deleteExc); + return null; + }); + } else if (deleteExc != null) { + future.completeExceptionally(deleteExc); + } else { + future.complete(deleteResult); + } + }); + }); + + return future; + } } // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java index 0015dd148b35d..2b701ec6a0609 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java @@ -30,6 +30,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -37,6 +38,7 @@ import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; +@Slf4j public class TopicResources { private static final String MANAGED_LEDGER_PATH = "/managed-ledgers"; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 59e1226e16c71..4dcd8809c7788 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -280,8 +280,10 @@ private CompletableFuture internalDeletePartitionedTopicsAsync(List> futures = new ArrayList<>(); for (String topicName : topicNames) { - futures.add(namespaceResources().getPartitionedTopicResources() - .deletePartitionedTopicAsync(TopicName.get(topicName))); + TopicName tn = TopicName.get(topicName); + futures.add(pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .runWithMarkDeleteAsync(tn, + () -> namespaceResources().getPartitionedTopicResources().deletePartitionedTopicAsync(tn))); } return FutureUtil.waitForAll(futures); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d28a4113cc27b..afe54f5cc57cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -745,8 +745,9 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, .thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force)); }) // Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted - ).thenCompose(__ -> namespaceResources() - .getPartitionedTopicResources().deletePartitionedTopicAsync(topicName)) + ).thenCompose(__ -> getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .runWithMarkDeleteAsync(topicName, () -> namespaceResources() + .getPartitionedTopicResources().deletePartitionedTopicAsync(topicName))) .thenAccept(__ -> { log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName); asyncResponse.resume(Response.noContent().build()); @@ -780,6 +781,13 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, } private CompletableFuture internalRemovePartitionsTopicAsync(int numPartitions, boolean force) { + return pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .runWithMarkDeleteAsync(topicName, + () -> internalRemovePartitionsTopicNoAutocreationDisableAsync(numPartitions, force)); + } + + private CompletableFuture internalRemovePartitionsTopicNoAutocreationDisableAsync(int numPartitions, + boolean force) { return FutureUtil.waitForAll(IntStream.range(0, numPartitions) .mapToObj(i -> { TopicName topicNamePartition = topicName.getPartition(i); @@ -2304,79 +2312,81 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated, properties); } else { - boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName); - getPartitionedTopicMetadataAsync(topicName, - authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> { - final int numPartitions = partitionMetadata.partitions; - if (numPartitions > 0) { - final CompletableFuture future = new CompletableFuture<>(); - final AtomicInteger count = new AtomicInteger(numPartitions); - final AtomicInteger failureCount = new AtomicInteger(0); - final AtomicReference partitionException = new AtomicReference<>(); + pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) + .thenCompose(allowAutoTopicCreation -> getPartitionedTopicMetadataAsync(topicName, + authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> { + final int numPartitions = partitionMetadata.partitions; + if (numPartitions > 0) { + final CompletableFuture future = new CompletableFuture<>(); + final AtomicInteger count = new AtomicInteger(numPartitions); + final AtomicInteger failureCount = new AtomicInteger(0); + final AtomicReference partitionException = new AtomicReference<>(); + + // Create the subscription on each partition + for (int i = 0; i < numPartitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + pulsar().getAdminClient().topics() + .createSubscriptionAsync(topicNamePartition.toString(), + subscriptionName, targetMessageId, false, properties) + .handle((r, ex) -> { + if (ex != null) { + // fail the operation on unknown exception or + // if all the partitioned failed due to + // subscription-already-exist + if (failureCount.incrementAndGet() == numPartitions + || !(ex instanceof PulsarAdminException + .ConflictException)) { + partitionException.set(ex); + } + } - // Create the subscription on each partition - for (int i = 0; i < numPartitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - pulsar().getAdminClient().topics() - .createSubscriptionAsync(topicNamePartition.toString(), - subscriptionName, targetMessageId, false, properties) - .handle((r, ex) -> { - if (ex != null) { - // fail the operation on unknown exception or - // if all the partitioned failed due to - // subscription-already-exist - if (failureCount.incrementAndGet() == numPartitions - || !(ex instanceof PulsarAdminException.ConflictException)) { - partitionException.set(ex); - } - } + if (count.decrementAndGet() == 0) { + future.complete(null); + } - if (count.decrementAndGet() == 0) { - future.complete(null); - } + return null; + }); + } catch (Exception e) { + log.warn("[{}] [{}] Failed to create subscription {} at message id {}", + clientAppId(), topicNamePartition, subscriptionName, targetMessageId, e); + future.completeExceptionally(e); + } + } - return null; - }); - } catch (Exception e) { - log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), - topicNamePartition, subscriptionName, targetMessageId, e); - future.completeExceptionally(e); - } - } + future.whenComplete((r, ex) -> { + if (ex != null) { + if (ex instanceof PulsarAdminException) { + asyncResponse.resume(new RestException((PulsarAdminException) ex)); + return; + } else { + asyncResponse.resume(new RestException(ex)); + return; + } + } - future.whenComplete((r, ex) -> { - if (ex != null) { - if (ex instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) ex)); - return; - } else { - asyncResponse.resume(new RestException(ex)); - return; - } - } + if (partitionException.get() != null) { + log.warn("[{}] [{}] Failed to create subscription {} at message id {}", + clientAppId(), topicName, + subscriptionName, targetMessageId, partitionException.get()); + if (partitionException.get() instanceof PulsarAdminException) { + asyncResponse.resume( + new RestException((PulsarAdminException) partitionException.get())); + return; + } else { + asyncResponse.resume(new RestException(partitionException.get())); + return; + } + } - if (partitionException.get() != null) { - log.warn("[{}] [{}] Failed to create subscription {} at message id {}", - clientAppId(), topicName, - subscriptionName, targetMessageId, partitionException.get()); - if (partitionException.get() instanceof PulsarAdminException) { - asyncResponse.resume( - new RestException((PulsarAdminException) partitionException.get())); - return; - } else { - asyncResponse.resume(new RestException(partitionException.get())); - return; - } + asyncResponse.resume(Response.noContent().build()); + }); + } else { + internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, + subscriptionName, targetMessageId, authoritative, replicated, properties); } - asyncResponse.resume(Response.noContent().build()); - }); - } else { - internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, - subscriptionName, targetMessageId, authoritative, replicated, properties); - } - }).exceptionally(ex -> { + })).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (!isRedirectException(ex)) { log.error("[{}] Failed to create subscription {} on topic {}", @@ -2402,12 +2412,12 @@ private void internalCreateSubscriptionForNonPartitionedTopic( MessageIdImpl targetMessageId, boolean authoritative, boolean replicated, Map properties) { - boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName); - validateTopicOwnershipAsync(topicName, authoritative) .thenCompose(__ -> { validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName); - return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation); + return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) + .thenCompose(isAllowAutoTopicCreation -> pulsar().getBrokerService() + .getTopic(topicName.toString(), isAllowAutoTopicCreation)); }).thenApply(optTopic -> { if (optTopic.isPresent()) { return optTopic.get(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 4fd0cc0b6a63c..51f617f26b2db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -68,11 +68,12 @@ protected CompletableFuture internalLookupTopicAsync(TopicName topic // Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists // in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned, // we'll return the true flag. - CompletableFuture existFuture = pulsar().getBrokerService() - .isAllowAutoTopicCreation(topicName) - || (!topicName.isPersistent() && !topicName.isPartitioned()) + CompletableFuture existFuture = (!topicName.isPersistent() && !topicName.isPartitioned()) ? CompletableFuture.completedFuture(true) - : pulsar().getNamespaceService().checkTopicExists(topicName); + : pulsar().getNamespaceService().checkTopicExists(topicName) + .thenCompose(exists -> exists ? CompletableFuture.completedFuture(true) + : pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)); + return existFuture; }) .thenCompose(exist -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 2e193823d9f39..af966129b3ea6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1183,7 +1183,15 @@ public CompletableFuture> getOwnedTopicListForNamespaceBundle(Names public CompletableFuture checkTopicExists(TopicName topic) { if (topic.isPersistent()) { - return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); + if (topic.isPartitioned()) { + return pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExistsAsync(TopicName.get(topic.getPartitionedTopicName())) + .thenCompose(exists -> exists + ? pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic) + : CompletableFuture.completedFuture(false)); + } else { + return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); + } } else { if (topic.isPartitioned()) { final TopicName partitionedTopicName = TopicName.get(topic.getPartitionedTopicName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 14fb9a9a4b4ee..2392af7eb97c0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -966,7 +966,9 @@ public CompletableFuture> getTopicIfExists(final String topic) { } public CompletableFuture getOrCreateTopic(final String topic) { - return getTopic(topic, isAllowAutoTopicCreation(topic)).thenApply(Optional::get); + return isAllowAutoTopicCreationAsync(topic) + .thenCompose(isAllowed -> getTopic(topic, isAllowed)) + .thenApply(Optional::get); } public CompletableFuture> getTopic(final String topic, boolean createIfMissing) { @@ -2886,30 +2888,38 @@ public CompletableFuture fetchPartitionedTopicMetadata if (metadata.partitions == 0 && !topicExists && !topicName.isPartitioned() - && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies) && pulsar.getBrokerService() .isDefaultTopicTypePartitioned(topicName, policies)) { - - pulsar.getBrokerService() - .createDefaultPartitionedTopicAsync(topicName, policies) - .thenAccept(md -> future.complete(md)) - .exceptionally(ex -> { - if (ex.getCause() - instanceof MetadataStoreException.AlreadyExistsException) { - // The partitioned topic might be created concurrently - fetchPartitionedTopicMetadataAsync(topicName) - .whenComplete((metadata2, ex2) -> { - if (ex2 == null) { - future.complete(metadata2); - } else { - future.completeExceptionally(ex2); - } - }); - } else { - future.completeExceptionally(ex); - } - return null; - }); + isAllowAutoTopicCreationAsync(topicName, policies).thenAccept(allowed -> { + if (allowed) { + pulsar.getBrokerService() + .createDefaultPartitionedTopicAsync(topicName, policies) + .thenAccept(md -> future.complete(md)) + .exceptionally(ex -> { + if (ex.getCause() + instanceof MetadataStoreException + .AlreadyExistsException) { + // The partitioned topic might be created concurrently + fetchPartitionedTopicMetadataAsync(topicName) + .whenComplete((metadata2, ex2) -> { + if (ex2 == null) { + future.complete(metadata2); + } else { + future.completeExceptionally(ex2); + } + }); + } else { + future.completeExceptionally(ex); + } + return null; + }); + } else { + future.complete(metadata); + } + }).exceptionally(ex -> { + future.completeExceptionally(ex); + return null; + }); } else { future.complete(metadata); } @@ -3103,34 +3113,46 @@ public Optional getListenPortTls() { } } - public boolean isAllowAutoTopicCreation(final String topic) { + public CompletableFuture isAllowAutoTopicCreationAsync(final String topic) { TopicName topicName = TopicName.get(topic); - return isAllowAutoTopicCreation(topicName); + return isAllowAutoTopicCreationAsync(topicName); } - public boolean isAllowAutoTopicCreation(final TopicName topicName) { + public CompletableFuture isAllowAutoTopicCreationAsync(final TopicName topicName) { Optional policies = pulsar.getPulsarResources().getNamespaceResources() .getPoliciesIfCached(topicName.getNamespaceObject()); - return isAllowAutoTopicCreation(topicName, policies); + return isAllowAutoTopicCreationAsync(topicName, policies); } - public boolean isAllowAutoTopicCreation(final TopicName topicName, final Optional policies) { + private CompletableFuture isAllowAutoTopicCreationAsync(final TopicName topicName, + final Optional policies) { if (policies.isPresent() && policies.get().deleted) { log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}", topicName.getNamespaceObject()); - return false; + return CompletableFuture.completedFuture(false); } //System topic can always be created automatically if (pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) { - return true; + return CompletableFuture.completedFuture(true); } + final boolean allowed; AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies); if (autoTopicCreationOverride != null) { - return autoTopicCreationOverride.isAllowAutoTopicCreation(); + allowed = autoTopicCreationOverride.isAllowAutoTopicCreation(); } else { - return pulsar.getConfiguration().isAllowAutoTopicCreation(); + allowed = pulsar.getConfiguration().isAllowAutoTopicCreation(); } + + if (allowed && topicName.isPartitioned()) { + // cannot re-create topic while it is being deleted + return pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .isPartitionedTopicBeingDeletedAsync(topicName) + .thenApply(beingDeleted -> !beingDeleted); + } else { + return CompletableFuture.completedFuture(allowed); + } + } public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional policies) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4c71ecee2e288..245a1ef51f2dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1081,9 +1081,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { return null; } - boolean createTopicIfDoesNotExist = forceTopicCreation - && service.isAllowAutoTopicCreation(topicName.toString()); - final long consumerEpoch; if (subscribe.hasConsumerEpoch()) { consumerEpoch = subscribe.getConsumerEpoch(); @@ -1092,7 +1089,10 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( subscribe.getSubscriptionPropertiesList()); - service.getTopic(topicName.toString(), createTopicIfDoesNotExist) + service.isAllowAutoTopicCreationAsync(topicName.toString()) + .thenApply(isAllowed -> forceTopicCreation && isAllowed) + .thenCompose(createTopicIfDoesNotExist -> + service.getTopic(topicName.toString(), createTopicIfDoesNotExist)) .thenCompose(optTopic -> { if (!optTopic.isPresent()) { return FutureUtil diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 00f4c63d20bd5..cf46103cc357b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -1011,7 +1011,9 @@ private CompletableFuture tryToDeletePartitionedMetadata() { if (!partitionedTopicExist) { return CompletableFuture.completedFuture(null); } else { - return partitionedTopicResources.deletePartitionedTopicAsync(topicName); + return getBrokerService().pulsar().getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources().runWithMarkDeleteAsync(topicName, + () -> partitionedTopicResources.deletePartitionedTopicAsync(topicName)); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 719ed3a0d261e..94cc0964c3708 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1154,7 +1154,6 @@ public CompletableFuture deleteForcefully() { private CompletableFuture delete(boolean failIfHasSubscriptions, boolean failIfHasBacklogs, boolean closeIfClientsConnected) { - CompletableFuture deleteFuture = new CompletableFuture<>(); lock.writeLock().lock(); try { @@ -1174,106 +1173,113 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, } fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting - CompletableFuture closeClientFuture = new CompletableFuture<>(); - if (closeIfClientsConnected) { - List> futures = new ArrayList<>(); - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); - producers.values().forEach(producer -> futures.add(producer.disconnect())); - subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); - FutureUtil.waitForAll(futures).thenRun(() -> { + + return getBrokerService().getPulsar().getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> { + CompletableFuture deleteFuture = new CompletableFuture<>(); + + CompletableFuture closeClientFuture = new CompletableFuture<>(); + if (closeIfClientsConnected) { + List> futures = new ArrayList<>(); + replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); + shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); + producers.values().forEach(producer -> futures.add(producer.disconnect())); + subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); + FutureUtil.waitForAll(futures).thenRun(() -> { + closeClientFuture.complete(null); + }).exceptionally(ex -> { + log.error("[{}] Error closing clients", topic, ex); + unfenceTopicToResume(); + closeClientFuture.completeExceptionally(ex); + return null; + }); + } else { closeClientFuture.complete(null); - }).exceptionally(ex -> { - log.error("[{}] Error closing clients", topic, ex); - unfenceTopicToResume(); - closeClientFuture.completeExceptionally(ex); - return null; - }); - } else { - closeClientFuture.complete(null); - } - - closeClientFuture.thenAccept(delete -> { - // We can proceed with the deletion if either: - // 1. No one is connected - // 2. We want to kick out everyone and forcefully delete the topic. - // In this case, we shouldn't care if the usageCount is 0 or not, just proceed - if (currentUsageCount() == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) { - CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); - brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); - - deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema()) - .thenCompose(__ -> deleteTopicPolicies()) - .thenCompose(__ -> transactionBufferCleanupAndClose()) - .whenComplete((v, ex) -> { - if (ex != null) { - log.error("[{}] Error deleting topic", topic, ex); - unfenceTopicToResume(); - deleteFuture.completeExceptionally(ex); - } else { - List> subsDeleteFutures = new ArrayList<>(); - subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); - - FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { - if (e != null) { - log.error("[{}] Error deleting topic", topic, e); - unfenceTopicToResume(); - deleteFuture.completeExceptionally(e); - } else { - ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(PersistentTopic.this); - - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - - unregisterTopicPolicyListener(); - - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); - } + } - @Override - public void - deleteLedgerFailed(ManagedLedgerException exception, - Object ctx) { - if (exception.getCause() - instanceof MetadataStoreException.NotFoundException) { - log.info("[{}] Topic is already deleted {}", - topic, exception.getMessage()); - deleteLedgerComplete(ctx); - } else { - unfenceTopicToResume(); - log.error("[{}] Error deleting topic", - topic, exception); - deleteFuture.completeExceptionally( - new PersistenceException(exception)); + closeClientFuture.thenAccept(delete -> { + // We can proceed with the deletion if either: + // 1. No one is connected + // 2. We want to kick out everyone and forcefully delete the topic. + // In this case, we shouldn't care if the usageCount is 0 or not, just proceed + if (currentUsageCount() == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) { + CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); + brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); + + deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema()) + .thenCompose(__ -> deleteTopicPolicies()) + .thenCompose(__ -> transactionBufferCleanupAndClose()) + .whenComplete((v, ex) -> { + if (ex != null) { + log.error("[{}] Error deleting topic", topic, ex); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(ex); + } else { + List> subsDeleteFutures = new ArrayList<>(); + subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); + + FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { + if (e != null) { + log.error("[{}] Error deleting topic", topic, e); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(e); + } else { + ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + brokerService.removeTopicFromCache(PersistentTopic.this); + + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + + unregisterTopicPolicyListener(); + + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); } - } - }, null); - } - }); - } - }); - } else { + @Override + public void + deleteLedgerFailed(ManagedLedgerException exception, + Object ctx) { + if (exception.getCause() + instanceof MetadataStoreException.NotFoundException) { + log.info("[{}] Topic is already deleted {}", + topic, exception.getMessage()); + deleteLedgerComplete(ctx); + } else { + unfenceTopicToResume(); + log.error("[{}] Error deleting topic", + topic, exception); + deleteFuture.completeExceptionally( + new PersistenceException(exception)); + } + } + }, null); + + } + }); + } + }); + } else { + unfenceTopicToResume(); + deleteFuture.completeExceptionally(new TopicBusyException( + "Topic has " + currentUsageCount() + " connected producers/consumers")); + } + }).exceptionally(ex->{ unfenceTopicToResume(); - deleteFuture.completeExceptionally(new TopicBusyException( - "Topic has " + currentUsageCount() + " connected producers/consumers")); - } - }).exceptionally(ex->{ - unfenceTopicToResume(); - deleteFuture.completeExceptionally( - new TopicBusyException("Failed to close clients before deleting topic.")); - return null; - }); + deleteFuture.completeExceptionally( + new TopicBusyException("Failed to close clients before deleting topic.")); + return null; + }); + + return deleteFuture; + }); } finally { lock.writeLock().unlock(); } - return deleteFuture; } public CompletableFuture close() { @@ -2458,7 +2464,9 @@ private CompletableFuture tryToDeletePartitionedMetadata() { if (!partitionedTopicExist) { return CompletableFuture.completedFuture(null); } else { - return getBrokerService() + return getBrokerService().pulsar().getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources().runWithMarkDeleteAsync(topicName, () -> + getBrokerService() .fetchPartitionedTopicMetadataAsync(topicName) .thenCompose((metadata -> { List> persistentTopicExists = @@ -2489,7 +2497,8 @@ private CompletableFuture tryToDeletePartitionedMetadata() { .deletePartitionedTopicAsync(topicName); } }); - })); + })) + ); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java index 755eac1f7e844..13cac6b5905e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.junit.Assert.assertFalse; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; @@ -26,6 +27,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.broker.MultiBrokerBaseTest; @@ -33,9 +36,16 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.internal.TopicsImpl; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -54,6 +64,7 @@ protected int numberOfAdditionalBrokers() { @Override protected void doInitConf() throws Exception { super.doInitConf(); + this.conf.setManagedLedgerMaxEntriesPerLedger(10); } @Override @@ -122,4 +133,80 @@ public void testTopicLookup(TopicDomain topicDomain, boolean isPartition) throws Assert.assertEquals(lookupResultSet.size(), 1); } + @Test + public void testForceDeletePartitionedTopicWithSub() throws Exception { + final int numPartitions = 10; + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); + admin.tenants().createTenant("tenant-xyz", tenantInfo); + admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test")); + + admin.namespaces().setAutoTopicCreation("tenant-xyz/ns-abc", + AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType("partitioned") + .defaultNumPartitions(5) + .build()); + + RetentionPolicies retention = new RetentionPolicies(10, 10); + admin.namespaces().setRetention("tenant-xyz/ns-abc", retention); + final String topic = "persistent://tenant-xyz/ns-abc/topic-" + + RandomStringUtils.randomAlphabetic(5) + + "-testDeletePartitionedTopicWithSub"; + final String subscriptionName = "sub"; + ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get(); + + log.info("Creating producer and consumer"); + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscriptionName) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create(); + + log.info("producing messages"); + for (int i = 0; i < numPartitions * 100; ++i) { + producer.newMessage() + .key("" + i) + .value("value-" + i) + .send(); + } + producer.flush(); + + log.info("consuming some messages"); + for (int i = 0; i < numPartitions * 5; i++) { + Message m = consumer.receive(1, TimeUnit.MINUTES); + } + + log.info("trying to delete the topic {}", topic); + admin.topics().deletePartitionedTopic(topic, true); + + log.info("closing producer and consumer"); + producer.close(); + consumer.close(); + + // topic autocreate might sneak in after the delete / before close + // but the topic metadata should be consistent to go through deletion again + if (admin.topics().getList("tenant-xyz/ns-abc") + .stream().anyMatch(t -> t.contains(topic))) { + try { + admin.topics().deletePartitionedTopic(topic, true); + } catch (PulsarAdminException.NotFoundException nfe) { + // pass + } + + assertEquals(0, admin.topics().getList("tenant-xyz/ns-abc") + .stream().filter(t -> t.contains(topic)).count()); + assertEquals(0, + pulsar.getPulsarResources().getTopicResources() + .getExistingPartitions(TopicName.getPartitionedTopicName(topic)) + .get() + .stream().filter(t -> t.contains(topic)).count()); + assertFalse(admin.topics() + .getPartitionedTopicList("tenant-xyz/ns-abc") + .contains(topic)); + } else { + log.info("trying to create the topic again"); + ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get(); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 51c68ea0e89df..4f9c3b277660b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1291,6 +1291,7 @@ public void testSubscribeRate() throws Exception { pulsarClient.updateServiceUrl(lookupUrl.toString()); Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected())); pulsar.getConfiguration().setAuthorizationEnabled(true); + consumer.close(); admin.topics().deletePartitionedTopic(topicName, true); admin.namespaces().deleteNamespace(namespace); admin.tenants().deleteTenant("my-tenants"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java index 939c52c1b738f..77541fb5cb43a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java @@ -104,6 +104,8 @@ public void setUp() throws Exception { doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(auth).when(brokerService).getAuthorizationService(); doReturn(new Semaphore(1000)).when(brokerService).getLookupRequestSemaphore(); + doReturn(CompletableFuture.completedFuture(false)).when(brokerService) + .isAllowAutoTopicCreationAsync(any(TopicName.class)); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index a5d1c4d943834..b619d72f1e861 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -467,6 +467,7 @@ public void testConsumerBacklogEvictionTimeQuotaPrecise() throws Exception { .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(), BacklogQuota.BacklogQuotaType.message_age); config.setPreciseTimeBasedBacklogQuotaCheck(true); + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -496,7 +497,6 @@ public void testConsumerBacklogEvictionTimeQuotaPrecise() throws Exception { // All messages for both subscription should be cleaned up from backlog by backlog monitor task. assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0); assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 0); - client.close(); } @Test(timeOut = 60000) @@ -508,6 +508,7 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(), BacklogQuota.BacklogQuotaType.message_age); + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -546,7 +547,6 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { }); assertEquals(ml.getSlowestConsumer().getReadPosition(), slowConsumerReadPos); - client.close(); } @Test(timeOut = 60000) @@ -558,6 +558,7 @@ public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Except .limitTime(5) // set limit time as 5 seconds .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(), BacklogQuota.BacklogQuotaType.message_age); + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -599,7 +600,6 @@ public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Except assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9); assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 9); }); - client.close(); } @@ -613,6 +613,7 @@ public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Excepti .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(), BacklogQuota.BacklogQuotaType.message_age); config.setPreciseTimeBasedBacklogQuotaCheck(true); + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -646,8 +647,6 @@ public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Excepti TopicStats latestStats = getTopicStats(topic); assertEquals(latestStats.getSubscriptions().get(subName).getMsgBacklog(), 0); }); - - client.close(); } @Test @@ -660,6 +659,7 @@ public void testConsumerBacklogEvictionWithAckSizeQuota() throws Exception { .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build()); + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); final String topic1 = "persistent://prop/ns-quota/topic11" + UUID.randomUUID(); @@ -695,6 +695,7 @@ public void testConsumerBacklogEvictionWithAckTimeQuotaPrecise() throws Exceptio .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(), BacklogQuota.BacklogQuotaType.message_age); config.setPreciseTimeBasedBacklogQuotaCheck(true); + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); final String topic1 = "persistent://prop/ns-quota/topic12" + UUID.randomUUID(); @@ -736,7 +737,6 @@ public void testConsumerBacklogEvictionWithAckTimeQuotaPrecise() throws Exceptio stats = getTopicStats(topic1); // sub2 has empty backlog because it's backlog get cleaned up by backlog quota monitor task assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 0); - client.close(); } private Producer createProducer(PulsarClient client, String topic) @@ -1249,6 +1249,7 @@ public void testProducerExceptionAndThenUnblockTimeQuotaPrecise() throws Excepti .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) .build(), BacklogQuota.BacklogQuotaType.message_age); config.setPreciseTimeBasedBacklogQuotaCheck(true); + @Cleanup final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) .statsInterval(0, TimeUnit.SECONDS).build(); final String topic1 = "persistent://prop/quotahold/exceptandunblock2"; @@ -1302,7 +1303,6 @@ public void testProducerExceptionAndThenUnblockTimeQuotaPrecise() throws Excepti sendException = e; } assertFalse(gotException, "unable to publish due to " + sendException); - client.close(); } @Test @@ -1314,6 +1314,7 @@ public void testProducerExceptionAndThenUnblockTimeQuota() throws Exception { .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) .build(), BacklogQuota.BacklogQuotaType.message_age); + @Cleanup final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) .statsInterval(0, TimeUnit.SECONDS).build(); final String topic1 = "persistent://prop/quotahold/exceptandunblock2"; @@ -1367,12 +1368,12 @@ public void testProducerExceptionAndThenUnblockTimeQuota() throws Exception { sendException = e; } assertFalse(gotException, "unable to publish due to " + sendException); - client.close(); } @Test(dataProvider = "backlogQuotaSizeGB", priority = 1) public void testBacklogQuotaInGB(boolean backlogQuotaSizeGB) throws Exception { + admin.close(); pulsar.close(); long backlogQuotaByte = 10 * 1024; if (backlogQuotaSizeGB) { @@ -1383,6 +1384,7 @@ public void testBacklogQuotaInGB(boolean backlogQuotaSizeGB) throws Exception { config.setBacklogQuotaDefaultRetentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction); pulsar = new PulsarService(config); pulsar.start(); + admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.toString()).build(); @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).statsInterval(0, TimeUnit.SECONDS) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java index d4147696f8444..c9006f797fce5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java @@ -57,6 +57,7 @@ import com.google.common.collect.Sets; import java.util.Random; +import java.util.concurrent.TimeUnit; public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest { protected static final int ASYNC_EVENT_COMPLETION_WAIT = 100; @@ -230,7 +231,7 @@ public static void deleteNamespaceGraceFully(String ns, boolean force, PulsarAdm expectChangeEventTopics.add(t); } } - Awaitility.await().until(() -> { + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { boolean finished = true; for (String changeEventTopicName : expectChangeEventTopics){ boolean bundleExists = firstPulsar.getNamespaceService() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 747b5cdc66806..5f95e557b8c1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -26,10 +26,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - import io.netty.util.HashedWheelTimer; import lombok.Cleanup; - import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.PulsarClient; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 41f3dee76468d..45ebc685bbb27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -145,6 +145,7 @@ import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -1295,7 +1296,9 @@ public void testDeleteTopic() throws Exception { role, false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty(), true); topic.addProducer(producer, new CompletableFuture<>()).join(); - assertTrue(topic.delete().isCompletedExceptionally()); + CompletableFuture cf = topic.delete(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); assertFalse((boolean) isFencedField.get(topic)); assertFalse((boolean) isClosingOrDeletingField.get(topic)); topic.removeProducer(producer); @@ -1312,7 +1315,9 @@ public void testDeleteTopic() throws Exception { Future f1 = topic.subscribe(getSubscriptionOption(cmd)); f1.get(); - assertTrue(topic.delete().isCompletedExceptionally()); + CompletableFuture cf2 = topic.delete(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> cf2.isDone()); + assertTrue(cf2.isCompletedExceptionally()); assertFalse((boolean) isFencedField.get(topic)); assertFalse((boolean) isClosingOrDeletingField.get(topic)); topic.unsubscribe(successSubName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index d26e9b36bd9d1..17287a4a12f59 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -1230,34 +1230,6 @@ public void testSubscriptionMustCompleteWhenOperationTimeoutOnMultipleTopics() t } } - @Test(timeOut = testTimeout) - public void testAutoDiscoverMultiTopicsPartitions() throws Exception { - final String topicName = "persistent://public/default/issue-9585"; - admin.topics().createPartitionedTopic(topicName, 3); - PatternMultiTopicsConsumerImpl consumer = (PatternMultiTopicsConsumerImpl) pulsarClient.newConsumer(Schema.STRING) - .topicsPattern(topicName) - .subscriptionName("sub-issue-9585") - .subscribe(); - - Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3); - Assert.assertEquals(consumer.getConsumers().size(), 3); - - admin.topics().deletePartitionedTopic(topicName, true); - consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); - Awaitility.await().untilAsserted(() -> { - Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0); - Assert.assertEquals(consumer.getConsumers().size(), 0); - }); - - admin.topics().createPartitionedTopic(topicName, 7); - consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); - Awaitility.await().untilAsserted(() -> { - Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7); - Assert.assertEquals(consumer.getConsumers().size(), 7); - }); - } - - @Test(timeOut = testTimeout) public void testPartitionsUpdatesForMultipleTopics() throws Exception { final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0"; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java index 94b7dc4a717cf..469a58c6190c5 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java @@ -28,6 +28,7 @@ public class PartitionedTopicMetadata { /* Number of partitions for the topic */ public int partitions; + public boolean deleted; /* Topic properties */ public Map properties; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java new file mode 100644 index 0000000000000..0adb414e8f4f6 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.topics; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.fail; + +/** + * Test cases for compaction. + */ +@Slf4j +public class TestTopicDeletion extends PulsarTestSuite { + + final private boolean unload = false; + final private int numBrokers = 2; + + public void setupCluster() throws Exception { + brokerEnvs.put("managedLedgerMaxEntriesPerLedger", "10"); + brokerEnvs.put("brokerDeleteInactivePartitionedTopicMetadataEnabled", "false"); + brokerEnvs.put("brokerDeleteInactiveTopicsEnabled", "false"); + this.setupCluster(""); + } + + protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster( + String clusterName, + PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) { + specBuilder.numBrokers(numBrokers); + specBuilder.enableContainerLog(true); + return specBuilder; + } + + @Test(dataProvider = "ServiceUrls", timeOut=300_000) + public void testPartitionedTopicForceDeletion(Supplier serviceUrl) throws Exception { + + log.info("Creating tenant and namespace"); + + final String tenant = "test-partitioned-topic-" + randomName(4); + final String namespace = tenant + "/ns1"; + final String topic = "persistent://" + namespace + "/partitioned-topic"; + final int numPartitions = numBrokers * 3; + final int numKeys = numPartitions * 50; + final String subscriptionName = "sub1"; + + this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin"); + + this.createNamespace(namespace); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-clusters", "--clusters", pulsarCluster.getClusterName(), namespace); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-retention", "--size", "100M", "--time", "100m", namespace); + + this.createPartitionedTopic(topic, numPartitions); + + try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) { + + log.info("Creating consumer"); + Consumer consumer = client.newConsumer() + .topic(topic) + .subscriptionName(subscriptionName) + .subscribe(); + + log.info("Producing messages"); + try(Producer producer = client.newProducer() + .topic(topic) + .create() + ) { + for (int i = 0; i < numKeys; i++) { + producer.newMessage() + .key("" + i) + .value(("value-" + i).getBytes(UTF_8)) + .sendAsync(); + } + producer.flush(); + log.info("Successfully wrote {} values", numKeys); + } + + log.info("Consuming half of the messages"); + for (int i = 0; i < numKeys / 2; i++) { + Message m = consumer.receive(1, TimeUnit.MINUTES); + log.info("Read value {}", m.getKey()); + } + + if (unload) { + log.info("Unloading topic"); + pulsarCluster.runAdminCommandOnAnyBroker("topics", + "unload", topic); + } + + ContainerExecResult res; + log.info("Deleting the topic"); + try { + res = pulsarCluster.runAdminCommandOnAnyBroker("topics", + "delete-partitioned-topic", "--force", topic); + assertNotEquals(0, res.getExitCode()); + } catch (ContainerExecException e) { + log.info("Second delete failed with ContainerExecException, could be ok", e); + if (!e.getMessage().contains("with error code 1")) { + fail("Expected different error code"); + } + } + + log.info("Close the consumer and delete the topic again"); + consumer.close(); + + res = pulsarCluster.runAdminCommandOnAnyBroker("topics", + "delete-partitioned-topic", "--force", topic); + assertNotEquals(0, res.getExitCode()); + + Thread.sleep(5000); + // should succeed + log.info("Creating the topic again"); + this.createPartitionedTopic(topic, numBrokers * 2); + } + } + + + private ContainerExecResult createTenantName(final String tenantName, + final String allowedClusterName, + final String adminRoleName) throws Exception { + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( + "tenants", "create", "--allowed-clusters", allowedClusterName, + "--admin-roles", adminRoleName, tenantName); + assertEquals(0, result.getExitCode()); + return result; + } + + private ContainerExecResult createNamespace(final String Ns) throws Exception { + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", + "create", + "--clusters", + pulsarCluster.getClusterName(), Ns); + assertEquals(0, result.getExitCode()); + return result; + } + + private ContainerExecResult createPartitionedTopic(final String partitionedTopicName, int numPartitions) + throws Exception { + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( + "topics", + "create-partitioned-topic", + "--partitions", "" + numPartitions, + partitionedTopicName); + assertEquals(0, result.getExitCode()); + return result; + } + + +} From 15920addcf254c7d669533e25b211fcffb70aac5 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 25 Oct 2022 11:41:06 -0700 Subject: [PATCH 2/3] CR feedback --- .../pulsar/broker/resources/NamespaceResources.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 5a7c22c284d97..315ffa9ad13bd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -301,7 +301,9 @@ public CompletableFuture markPartitionedTopicDeletedAsync(TopicName tn) { if (tn.isPartitioned()) { return CompletableFuture.completedFuture(null); } - log.debug("markPartitionedTopicDeletedAsync {}", tn); + if (log.isDebugEnabled()) { + log.debug("markPartitionedTopicDeletedAsync {}", tn); + } return updatePartitionedTopicAsync(tn, md -> { md.deleted = true; return md; @@ -312,7 +314,9 @@ public CompletableFuture unmarkPartitionedTopicDeletedAsync(TopicName tn) if (tn.isPartitioned()) { return CompletableFuture.completedFuture(null); } - log.debug("unmarkPartitionedTopicDeletedAsync {}", tn); + if (log.isDebugEnabled()) { + log.debug("unmarkPartitionedTopicDeletedAsync {}", tn); + } return updatePartitionedTopicAsync(tn, md -> { md.deleted = false; return md; From 78b899ecd504614dd50d2fdc231d3a45744b3acf Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 25 Oct 2022 11:47:02 -0700 Subject: [PATCH 3/3] fix license issue: coupleof files without changes in format https://github.com/apache/pulsar/pull/18177 --- .../org/apache/pulsar/tests/integration/tls/ClientTlsTest.java | 2 +- .../pulsar/tests/integration/topics/TestTopicDeletion.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java index fe5c3ad40b5e1..59ff978cafa06 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/tls/ClientTlsTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java index 0adb414e8f4f6..044d17bac6156 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information