Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. #18193

Merged
merged 3 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -295,6 +296,79 @@ public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String tenant) {
final String partitionedTopicPath = joinPath(PARTITIONED_TOPIC_PATH, tenant);
return deleteIfExistsAsync(partitionedTopicPath);
}

public CompletableFuture<Void> markPartitionedTopicDeletedAsync(TopicName tn) {
if (tn.isPartitioned()) {
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("markPartitionedTopicDeletedAsync {}", tn);
}
return updatePartitionedTopicAsync(tn, md -> {
md.deleted = true;
return md;
});
}

public CompletableFuture<Void> unmarkPartitionedTopicDeletedAsync(TopicName tn) {
if (tn.isPartitioned()) {
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("unmarkPartitionedTopicDeletedAsync {}", tn);
}
return updatePartitionedTopicAsync(tn, md -> {
md.deleted = false;
return md;
});
}

public CompletableFuture<Boolean> isPartitionedTopicBeingDeletedAsync(TopicName tn) {
if (tn.isPartitioned()) {
tn = TopicName.get(tn.getPartitionedTopicName());
}
return getPartitionedTopicMetadataAsync(tn)
.thenApply(mdOpt -> mdOpt.map(partitionedTopicMetadata -> partitionedTopicMetadata.deleted)
.orElse(false));
}

public CompletableFuture<Void> runWithMarkDeleteAsync(TopicName topic,
Supplier<CompletableFuture<Void>> supplier) {
CompletableFuture<Void> future = new CompletableFuture<>();

markPartitionedTopicDeletedAsync(topic).whenCompleteAsync((markResult, markExc) -> {
final boolean mdFound;
if (markExc != null) {
if (markExc.getCause() instanceof MetadataStoreException.NotFoundException) {
mdFound = false;
} else {
log.error("Failed to mark the topic {} as deleted", topic, markExc);
future.completeExceptionally(markExc);
return;
}
} else {
mdFound = true;
}

supplier.get().whenComplete((deleteResult, deleteExc) -> {
if (deleteExc != null && mdFound) {
unmarkPartitionedTopicDeletedAsync(topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the broker is down at this point, we should design a mechanism to reset the meta of this topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode can simply delete the topic again.

.thenRun(() -> future.completeExceptionally(deleteExc))
.exceptionally(ex -> {
log.warn("Failed to unmark the topic {} as deleted", topic, ex);
future.completeExceptionally(deleteExc);
return null;
});
} else if (deleteExc != null) {
future.completeExceptionally(deleteExc);
} else {
future.complete(deleteResult);
}
});
});

return future;
}
}

// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;

@Slf4j
public class TopicResources {
private static final String MANAGED_LEDGER_PATH = "/managed-ledgers";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,10 @@ private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(List<String
}
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String topicName : topicNames) {
futures.add(namespaceResources().getPartitionedTopicResources()
.deletePartitionedTopicAsync(TopicName.get(topicName)));
TopicName tn = TopicName.get(topicName);
futures.add(pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(tn,
() -> namespaceResources().getPartitionedTopicResources().deletePartitionedTopicAsync(tn)));
}
return FutureUtil.waitForAll(futures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,8 +745,9 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
.thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force));
})
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
).thenCompose(__ -> namespaceResources()
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName))
).thenCompose(__ -> getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(topicName, () -> namespaceResources()
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName)))
.thenAccept(__ -> {
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
Expand Down Expand Up @@ -780,6 +781,13 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
}

private CompletableFuture<Void> internalRemovePartitionsTopicAsync(int numPartitions, boolean force) {
return pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(topicName,
() -> internalRemovePartitionsTopicNoAutocreationDisableAsync(numPartitions, force));
}

private CompletableFuture<Void> internalRemovePartitionsTopicNoAutocreationDisableAsync(int numPartitions,
boolean force) {
return FutureUtil.waitForAll(IntStream.range(0, numPartitions)
.mapToObj(i -> {
TopicName topicNamePartition = topicName.getPartition(i);
Expand Down Expand Up @@ -2304,79 +2312,81 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated, properties);
} else {
boolean allowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);
getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final AtomicInteger count = new AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
final AtomicReference<Throwable> partitionException = new AtomicReference<>();
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
.thenCompose(allowAutoTopicCreation -> getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final AtomicInteger count = new AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
final AtomicReference<Throwable> partitionException = new AtomicReference<>();

// Create the subscription on each partition
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
.createSubscriptionAsync(topicNamePartition.toString(),
subscriptionName, targetMessageId, false, properties)
.handle((r, ex) -> {
if (ex != null) {
// fail the operation on unknown exception or
// if all the partitioned failed due to
// subscription-already-exist
if (failureCount.incrementAndGet() == numPartitions
|| !(ex instanceof PulsarAdminException
.ConflictException)) {
partitionException.set(ex);
}
}

// Create the subscription on each partition
for (int i = 0; i < numPartitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
pulsar().getAdminClient().topics()
.createSubscriptionAsync(topicNamePartition.toString(),
subscriptionName, targetMessageId, false, properties)
.handle((r, ex) -> {
if (ex != null) {
// fail the operation on unknown exception or
// if all the partitioned failed due to
// subscription-already-exist
if (failureCount.incrementAndGet() == numPartitions
|| !(ex instanceof PulsarAdminException.ConflictException)) {
partitionException.set(ex);
}
}
if (count.decrementAndGet() == 0) {
future.complete(null);
}

if (count.decrementAndGet() == 0) {
future.complete(null);
}
return null;
});
} catch (Exception e) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}",
clientAppId(), topicNamePartition, subscriptionName, targetMessageId, e);
future.completeExceptionally(e);
}
}

return null;
});
} catch (Exception e) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(),
topicNamePartition, subscriptionName, targetMessageId, e);
future.completeExceptionally(e);
}
}
future.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) ex));
return;
} else {
asyncResponse.resume(new RestException(ex));
return;
}
}

future.whenComplete((r, ex) -> {
if (ex != null) {
if (ex instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) ex));
return;
} else {
asyncResponse.resume(new RestException(ex));
return;
}
}
if (partitionException.get() != null) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}",
clientAppId(), topicName,
subscriptionName, targetMessageId, partitionException.get());
if (partitionException.get() instanceof PulsarAdminException) {
asyncResponse.resume(
new RestException((PulsarAdminException) partitionException.get()));
return;
} else {
asyncResponse.resume(new RestException(partitionException.get()));
return;
}
}

if (partitionException.get() != null) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}",
clientAppId(), topicName,
subscriptionName, targetMessageId, partitionException.get());
if (partitionException.get() instanceof PulsarAdminException) {
asyncResponse.resume(
new RestException((PulsarAdminException) partitionException.get()));
return;
} else {
asyncResponse.resume(new RestException(partitionException.get()));
return;
}
asyncResponse.resume(Response.noContent().build());
});
} else {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated, properties);
}

asyncResponse.resume(Response.noContent().build());
});
} else {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated, properties);
}
}).exceptionally(ex -> {
})).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to create subscription {} on topic {}",
Expand All @@ -2402,12 +2412,12 @@ private void internalCreateSubscriptionForNonPartitionedTopic(
MessageIdImpl targetMessageId, boolean authoritative, boolean replicated,
Map<String, String> properties) {

boolean isAllowAutoTopicCreation = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName);

validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> {
validateTopicOperation(topicName, TopicOperation.SUBSCRIBE, subscriptionName);
return pulsar().getBrokerService().getTopic(topicName.toString(), isAllowAutoTopicCreation);
return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
.thenCompose(isAllowAutoTopicCreation -> pulsar().getBrokerService()
.getTopic(topicName.toString(), isAllowAutoTopicCreation));
}).thenApply(optTopic -> {
if (optTopic.isPresent()) {
return optTopic.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topic
// Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists
// in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned,
// we'll return the true flag.
CompletableFuture<Boolean> existFuture = pulsar().getBrokerService()
.isAllowAutoTopicCreation(topicName)
|| (!topicName.isPersistent() && !topicName.isPartitioned())
CompletableFuture<Boolean> existFuture = (!topicName.isPersistent() && !topicName.isPartitioned())
? CompletableFuture.completedFuture(true)
: pulsar().getNamespaceService().checkTopicExists(topicName);
: pulsar().getNamespaceService().checkTopicExists(topicName)
.thenCompose(exists -> exists ? CompletableFuture.completedFuture(true)
: pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName));

return existFuture;
})
.thenCompose(exist -> {
Expand Down
Loading