Skip to content

Commit

Permalink
Close topics after creating all subscriptions on increment-partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Sep 21, 2017
1 parent 94b1fb9 commit 3df445c
Showing 1 changed file with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1303,20 +1303,23 @@ private CompletableFuture<Void> createSubscriptions(DestinationName dn, int numP

// get list of cursors name of partition-1
final String ledgerName = dn.getPartition(1).getPersistenceNamingEncoding();
final Set<Topic> topics = Sets.newConcurrentHashSet();
((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory()).getMetaStore().getCursors(ledgerName,
new MetaStoreCallback<List<String>>() {

@Override
public void operationComplete(List<String> cursors,
org.apache.bookkeeper.mledger.impl.MetaStore.Stat stat) {
List<CompletableFuture<Void>> topicCreationFuture = Lists.newArrayList();
List<CompletableFuture<Void>> subscriptionCreationFuture = Lists.newArrayList();
// create subscriptions for all new partition-topics
cursors.forEach(cursor -> {
String subName = Codec.decode(cursor);
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicName = dn.getPartition(i).toString();
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().getTopic(topicName).handle((topic, ex) -> {
// cache topic to close all of them after creating all subscriptions
topics.add(topic);
if (ex != null) {
log.warn("[{}] Failed to create topic {}", clientAppId(), topicName);
future.completeExceptionally(ex);
Expand All @@ -1329,29 +1332,37 @@ public void operationComplete(List<String> cursors,
future.completeExceptionally(e);
return null;
} else {
log.info("[{}] Successfully create subsciption {} {}",
log.info("[{}] Successfully created subsciption {} {}",
clientAppId(), topicName, subName);
// close topic
topic.close();
future.complete(null);
return null;
}
});
return null;
}
});
topicCreationFuture.add(future);
subscriptionCreationFuture.add(future);
}
});
// wait for all subscriptions to be created
FutureUtil.waitForAll(topicCreationFuture).handle((res, e) -> {
if (e != null) {
result.completeExceptionally(e);
} else {
log.info("[{}] Successfully create new partitions {}", clientAppId(),
dn.toString());
result.complete(null);
}
FutureUtil.waitForAll(subscriptionCreationFuture).handle((res, subscriptionException) -> {
// close all topics and then complete result future
FutureUtil.waitForAll(
topics.stream().map(topic -> topic.close()).collect(Collectors.toList()))
.handle((closed, topicCloseException) -> {
if (topicCloseException != null) {
log.warn("Failed to close newly created partitioned topics for {} ", dn,
topicCloseException);
}
if (subscriptionException != null) {
result.completeExceptionally(subscriptionException);
} else {
log.info("[{}] Successfully created new partitions {}", clientAppId(),
dn.toString());
result.complete(null);
}
return null;
});
return null;
});
}
Expand Down

0 comments on commit 3df445c

Please sign in to comment.