diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 965c1e164a3fa..418da33aea0b3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1143,6 +1143,9 @@ public CompletableFuture deleteForcefully() { * Flag indicating whether delete should succeed if topic still has unconnected subscriptions. Set to * false when called from admin API (it will delete the subs too), and set to true when called from GC * thread + * @param failIfHasBacklogs + * Flag indicating whether delete should succeed if topic has backlogs. Set to false when called from + * admin API (it will delete the subs too), and set to true when called from GC thread * @param closeIfClientsConnected * Flag indicate whether explicitly close connected * producers/consumers/replicators before trying to delete topic. @@ -1160,16 +1163,33 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, if (isClosingOrDeleting) { log.warn("[{}] Topic is already being closed or deleted", topic); return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced")); - } else if (failIfHasSubscriptions && !subscriptions.isEmpty()) { - return FutureUtil.failedFuture( - new TopicBusyException("Topic has subscriptions: " + subscriptions.keys())); - } else if (failIfHasBacklogs && hasBacklogs()) { - List backlogSubs = - subscriptions.values().stream() - .filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0) - .map(PersistentSubscription::getName).toList(); - return FutureUtil.failedFuture( - new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs)); + } + // We can proceed with the deletion if either: + // 1. No one is connected and no subscriptions + // 2. The topic have subscriptions but no backlogs for all subscriptions + // if delete_when_no_subscriptions is applied + // 3. We want to kick out everyone and forcefully delete the topic. + // In this case, we shouldn't care if the usageCount is 0 or not, just proceed + if (!closeIfClientsConnected) { + if (failIfHasSubscriptions && !subscriptions.isEmpty()) { + return FutureUtil.failedFuture( + new TopicBusyException("Topic has subscriptions: " + subscriptions.keys())); + } else if (failIfHasBacklogs) { + if (hasBacklogs()) { + List backlogSubs = + subscriptions.values().stream() + .filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0) + .map(PersistentSubscription::getName).toList(); + return FutureUtil.failedFuture( + new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs)); + } else if (!producers.isEmpty()) { + return FutureUtil.failedFuture(new TopicBusyException( + "Topic has " + producers.size() + " connected producers")); + } + } else if (currentUsageCount() > 0) { + return FutureUtil.failedFuture(new TopicBusyException( + "Topic has " + currentUsageCount() + " connected producers/consumers")); + } } fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting @@ -1179,94 +1199,82 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, CompletableFuture deleteFuture = new CompletableFuture<>(); CompletableFuture closeClientFuture = new CompletableFuture<>(); + List> futures = new ArrayList<>(); + subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); if (closeIfClientsConnected) { - List> futures = new ArrayList<>(); replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); - subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); - FutureUtil.waitForAll(futures).thenRun(() -> { - closeClientFuture.complete(null); - }).exceptionally(ex -> { - log.error("[{}] Error closing clients", topic, ex); - unfenceTopicToResume(); - closeClientFuture.completeExceptionally(ex); - return null; - }); - } else { - closeClientFuture.complete(null); } + FutureUtil.waitForAll(futures).thenRun(() -> { + closeClientFuture.complete(null); + }).exceptionally(ex -> { + log.error("[{}] Error closing clients", topic, ex); + unfenceTopicToResume(); + closeClientFuture.completeExceptionally(ex); + return null; + }); - closeClientFuture.thenAccept(delete -> { - // We can proceed with the deletion if either: - // 1. No one is connected - // 2. We want to kick out everyone and forcefully delete the topic. - // In this case, we shouldn't care if the usageCount is 0 or not, just proceed - if (currentUsageCount() == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) { - CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); - brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); - - deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema()) - .thenCompose(__ -> deleteTopicPolicies()) - .thenCompose(__ -> transactionBufferCleanupAndClose()) - .whenComplete((v, ex) -> { - if (ex != null) { - log.error("[{}] Error deleting topic", topic, ex); - unfenceTopicToResume(); - deleteFuture.completeExceptionally(ex); - } else { - List> subsDeleteFutures = new ArrayList<>(); - subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); - - FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { - if (e != null) { - log.error("[{}] Error deleting topic", topic, e); - unfenceTopicToResume(); - deleteFuture.completeExceptionally(e); - } else { - ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(PersistentTopic.this); - - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - - unregisterTopicPolicyListener(); - - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); - } + closeClientFuture.thenAccept(__ -> { + CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); + brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); + + deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema()) + .thenCompose(ignore -> deleteTopicPolicies()) + .thenCompose(ignore -> transactionBufferCleanupAndClose()) + .whenComplete((v, ex) -> { + if (ex != null) { + log.error("[{}] Error deleting topic", topic, ex); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(ex); + } else { + List> subsDeleteFutures = new ArrayList<>(); + subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); + + FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { + if (e != null) { + log.error("[{}] Error deleting topic", topic, e); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(e); + } else { + ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + brokerService.removeTopicFromCache(PersistentTopic.this); + + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + + unregisterTopicPolicyListener(); + + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); + } - @Override - public void - deleteLedgerFailed(ManagedLedgerException exception, - Object ctx) { - if (exception.getCause() - instanceof MetadataStoreException.NotFoundException) { - log.info("[{}] Topic is already deleted {}", - topic, exception.getMessage()); - deleteLedgerComplete(ctx); - } else { - unfenceTopicToResume(); - log.error("[{}] Error deleting topic", - topic, exception); - deleteFuture.completeExceptionally( - new PersistenceException(exception)); - } + @Override + public void + deleteLedgerFailed(ManagedLedgerException exception, + Object ctx) { + if (exception.getCause() + instanceof MetadataStoreException.NotFoundException) { + log.info("[{}] Topic is already deleted {}", + topic, exception.getMessage()); + deleteLedgerComplete(ctx); + } else { + unfenceTopicToResume(); + log.error("[{}] Error deleting topic", + topic, exception); + deleteFuture.completeExceptionally( + new PersistenceException(exception)); } - }, null); + } + }, null); - } - }); - } - }); - } else { - unfenceTopicToResume(); - deleteFuture.completeExceptionally(new TopicBusyException( - "Topic has " + currentUsageCount() + " connected producers/consumers")); - } + } + }); + } + }); }).exceptionally(ex->{ unfenceTopicToResume(); deleteFuture.completeExceptionally( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index 8b4fe085407ed..f1dccce96c80c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -328,7 +328,6 @@ public void testDeleteWhenNoBacklogs() throws Exception { producer.send("Pulsar".getBytes()); } - consumer.close(); producer.close(); Thread.sleep(2000); @@ -338,6 +337,7 @@ public void testDeleteWhenNoBacklogs() throws Exception { admin.topics().skipAllMessages(topic, "sub"); Awaitility.await() .untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic))); + consumer.close(); } @Test