diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java index d012fcd4efc19..62a3e76c2d52a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java @@ -1303,13 +1303,14 @@ private CompletableFuture createSubscriptions(DestinationName dn, int numP // get list of cursors name of partition-1 final String ledgerName = dn.getPartition(1).getPersistenceNamingEncoding(); + final Set topics = Sets.newConcurrentHashSet(); ((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory()).getMetaStore().getCursors(ledgerName, new MetaStoreCallback>() { @Override public void operationComplete(List cursors, org.apache.bookkeeper.mledger.impl.MetaStore.Stat stat) { - List> topicCreationFuture = Lists.newArrayList(); + List> subscriptionCreationFuture = Lists.newArrayList(); // create subscriptions for all new partition-topics cursors.forEach(cursor -> { String subName = Codec.decode(cursor); @@ -1317,6 +1318,8 @@ public void operationComplete(List cursors, final String topicName = dn.getPartition(i).toString(); CompletableFuture future = new CompletableFuture<>(); pulsar().getBrokerService().getTopic(topicName).handle((topic, ex) -> { + // cache topic to close all of them after creating all subscriptions + topics.add(topic); if (ex != null) { log.warn("[{}] Failed to create topic {}", clientAppId(), topicName); future.completeExceptionally(ex); @@ -1329,10 +1332,8 @@ public void operationComplete(List cursors, future.completeExceptionally(e); return null; } else { - log.info("[{}] Successfully create subsciption {} {}", + log.info("[{}] Successfully created subsciption {} {}", clientAppId(), topicName, subName); - // close topic - topic.close(); future.complete(null); return null; } @@ -1340,18 +1341,28 @@ public void operationComplete(List cursors, return null; } }); - topicCreationFuture.add(future); + subscriptionCreationFuture.add(future); } }); // wait for all subscriptions to be created - FutureUtil.waitForAll(topicCreationFuture).handle((res, e) -> { - if (e != null) { - result.completeExceptionally(e); - } else { - log.info("[{}] Successfully create new partitions {}", clientAppId(), - dn.toString()); - result.complete(null); - } + FutureUtil.waitForAll(subscriptionCreationFuture).handle((res, subscriptionException) -> { + // close all topics and then complete result future + FutureUtil.waitForAll( + topics.stream().map(topic -> topic.close()).collect(Collectors.toList())) + .handle((closed, topicCloseException) -> { + if (topicCloseException != null) { + log.warn("Failed to close newly created partitioned topics for {} ", dn, + topicCloseException); + } + if (subscriptionException != null) { + result.completeExceptionally(subscriptionException); + } else { + log.info("[{}] Successfully created new partitions {}", clientAppId(), + dn.toString()); + result.complete(null); + } + return null; + }); return null; }); }