Skip to content

Commit

Permalink
[fix] Combination of autocreate + forced delete of partitioned topic …
Browse files Browse the repository at this point in the history
…with active consumer leaves topic metadata inconsistent. (apache#18193)

(cherry picked from commit 3fdbc9f)
  • Loading branch information
dlg99 committed Nov 8, 2022
1 parent 87a3323 commit 6e0a777
Show file tree
Hide file tree
Showing 19 changed files with 629 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -258,6 +259,79 @@ public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String tenant) {
final String partitionedTopicPath = joinPath(PARTITIONED_TOPIC_PATH, tenant);
return deleteIfExistsAsync(partitionedTopicPath);
}

public CompletableFuture<Void> markPartitionedTopicDeletedAsync(TopicName tn) {
if (tn.isPartitioned()) {
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("markPartitionedTopicDeletedAsync {}", tn);
}
return updatePartitionedTopicAsync(tn, md -> {
md.deleted = true;
return md;
});
}

public CompletableFuture<Void> unmarkPartitionedTopicDeletedAsync(TopicName tn) {
if (tn.isPartitioned()) {
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("unmarkPartitionedTopicDeletedAsync {}", tn);
}
return updatePartitionedTopicAsync(tn, md -> {
md.deleted = false;
return md;
});
}

public CompletableFuture<Boolean> isPartitionedTopicBeingDeletedAsync(TopicName tn) {
if (tn.isPartitioned()) {
tn = TopicName.get(tn.getPartitionedTopicName());
}
return getPartitionedTopicMetadataAsync(tn)
.thenApply(mdOpt -> mdOpt.map(partitionedTopicMetadata -> partitionedTopicMetadata.deleted)
.orElse(false));
}

public CompletableFuture<Void> runWithMarkDeleteAsync(TopicName topic,
Supplier<CompletableFuture<Void>> supplier) {
CompletableFuture<Void> future = new CompletableFuture<>();

markPartitionedTopicDeletedAsync(topic).whenCompleteAsync((markResult, markExc) -> {
final boolean mdFound;
if (markExc != null) {
if (markExc.getCause() instanceof MetadataStoreException.NotFoundException) {
mdFound = false;
} else {
log.error("Failed to mark the topic {} as deleted", topic, markExc);
future.completeExceptionally(markExc);
return;
}
} else {
mdFound = true;
}

supplier.get().whenComplete((deleteResult, deleteExc) -> {
if (deleteExc != null && mdFound) {
unmarkPartitionedTopicDeletedAsync(topic)
.thenRun(() -> future.completeExceptionally(deleteExc))
.exceptionally(ex -> {
log.warn("Failed to unmark the topic {} as deleted", topic, ex);
future.completeExceptionally(deleteExc);
return null;
});
} else if (deleteExc != null) {
future.completeExceptionally(deleteExc);
} else {
future.complete(deleteResult);
}
});
});

return future;
}
}

// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.metadata.api.MetadataStore;

@Slf4j
public class TopicResources {
private static final String MANAGED_LEDGER_PATH = "/managed-ledgers";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,10 @@ private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(List<String
}
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String topicName : topicNames) {
futures.add(namespaceResources().getPartitionedTopicResources()
.deletePartitionedTopicAsync(TopicName.get(topicName)));
TopicName tn = TopicName.get(topicName);
futures.add(pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(tn,
() -> namespaceResources().getPartitionedTopicResources().deletePartitionedTopicAsync(tn)));
}
return FutureUtil.waitForAll(futures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,9 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
.thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force));
})
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
).thenCompose(__ -> namespaceResources()
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName))
).thenCompose(__ -> getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(topicName, () -> namespaceResources()
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName)))
.thenAccept(__ -> {
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
Expand Down Expand Up @@ -661,6 +662,13 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
}

private CompletableFuture<Void> internalRemovePartitionsTopicAsync(int numPartitions, boolean force) {
return pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(topicName,
() -> internalRemovePartitionsTopicNoAutocreationDisableAsync(numPartitions, force));
}

private CompletableFuture<Void> internalRemovePartitionsTopicNoAutocreationDisableAsync(int numPartitions,
boolean force) {
return FutureUtil.waitForAll(IntStream.range(0, numPartitions)
.mapToObj(i -> {
TopicName topicNamePartition = topicName.getPartition(i);
Expand Down Expand Up @@ -2223,79 +2231,81 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated, properties);
} else {
boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final AtomicInteger count = new AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
final AtomicReference<Throwable> partitionException = new AtomicReference<>();
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
.thenCompose(allowAutoTopicCreation -> getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final AtomicInteger count = new AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
final AtomicReference<Throwable> partitionException = new AtomicReference<>();

// Create the subscription on each partition
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
.createSubscriptionAsync(topicNamePartition.toString(),
subscriptionName, targetMessageId, false, properties)
.handle((r, ex) -> {
if (ex != null) {
// fail the operation on unknown exception or
// if all the partitioned failed due to
// subscription-already-exist
if (failureCount.incrementAndGet() == numPartitions
|| !(ex instanceof PulsarAdminException
.ConflictException)) {
partitionException.set(ex);
}
}

// Create the subscription on each partition
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
.createSubscriptionAsync(topicNamePartition.toString(),
subscriptionName, targetMessageId, false, properties)
.handle((r, ex) -> {
if (ex != null) {
// fail the operation on unknown exception or
// if all the partitioned failed due to
// subscription-already-exist
if (failureCount.incrementAndGet() == numPartitions
|| !(ex instanceof PulsarAdminException.ConflictException)) {
partitionException.set(ex);
}
}
if (count.decrementAndGet() == 0) {
future.complete(null);
}

if (count.decrementAndGet() == 0) {
future.complete(null);
}
return null;
});
} catch (Exception e) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}",
clientAppId(), topicNamePartition, subscriptionName, targetMessageId, e);
future.completeExceptionally(e);
}
}

return null;
});
} catch (Exception e) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(),
topicNamePartition, subscriptionName, targetMessageId, e);
future.completeExceptionally(e);
}
}
future.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) ex));
return;
} else {
asyncResponse.resume(new RestException(ex));
return;
}
}

future.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) ex));
return;
} else {
asyncResponse.resume(new RestException(ex));
return;
}
}
if (partitionException.get() != null) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}",
clientAppId(), topicName,
subscriptionName, targetMessageId, partitionException.get());
if (partitionException.get() instanceof PulsarAdminException) {
asyncResponse.resume(
new RestException((PulsarAdminException) partitionException.get()));
return;
} else {
asyncResponse.resume(new RestException(partitionException.get()));
return;
}
}

if (partitionException.get() != null) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}",
clientAppId(), topicName,
subscriptionName, targetMessageId, partitionException.get());
if (partitionException.get() instanceof PulsarAdminException) {
asyncResponse.resume(
new RestException((PulsarAdminException) partitionException.get()));
return;
} else {
asyncResponse.resume(new RestException(partitionException.get()));
return;
}
asyncResponse.resume(Response.noContent().build());
});
} else {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated, properties);
}

asyncResponse.resume(Response.noContent().build());
});
} else {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated, properties);
}
}).exceptionally(ex -> {
})).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to create subscription {} on topic {}",
Expand All @@ -2321,12 +2331,12 @@ private void internalCreateSubscriptionForNonPartitionedTopic(
MessageIdImpl targetMessageId, boolean authoritative, boolean replicated,
Map<String, String> properties) {

boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);

validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
validateTopicOperation(topicName, TopicOperation.SUBSCRIBE);
return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation);
return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
.thenCompose(isAllowAutoTopicCreation -> pulsar().getBrokerService()
.getTopic(topicName.toString(), isAllowAutoTopicCreation));
}).thenApply(optTopic -> {
if (optTopic.isPresent()) {
return optTopic.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topic
// Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists
// in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned,
// we'll return the true flag.
CompletableFuture<Boolean> existFuture = pulsar().getBrokerService()
.isAllowAutoTopicCreation(topicName)
|| (!topicName.isPersistent() && !topicName.isPartitioned())
CompletableFuture<Boolean> existFuture = (!topicName.isPersistent() && !topicName.isPartitioned())
? CompletableFuture.completedFuture(true)
: pulsar().getNamespaceService().checkTopicExists(topicName);
: pulsar().getNamespaceService().checkTopicExists(topicName)
.thenCompose(exists -> exists ? CompletableFuture.completedFuture(true)
: pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName));

return existFuture;
})
.thenCompose(exist -> {
Expand Down
Loading

0 comments on commit 6e0a777

Please sign in to comment.