Skip to content

Commit

Permalink
[fix][broker] Fix stuck when enable topic level replication and build…
Browse files Browse the repository at this point in the history
… remote admin fails (apache#23028)

(cherry picked from commit 88ebe78)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
poorbarcode authored and nodece committed Sep 9, 2024
1 parent 14e29a0 commit 85fd190
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -37,9 +38,11 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
Expand Down Expand Up @@ -577,6 +580,17 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n

provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly, properties)
.thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
.thenRun(() -> {
if (!createLocalTopicOnly && topicName.isGlobal()) {
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
log.info("[{}] Successfully created partitioned for topic {} for the remote clusters",
clientAppId());
} else {
log.info("[{}] Skip creating partitioned for topic {} for the remote clusters",
clientAppId(), topicName);
}
asyncResponse.resume(Response.noContent().build());
})
.whenComplete((ignored, ex) -> {
if (ex != null) {
createLocalFuture.completeExceptionally(ex);
Expand All @@ -589,44 +603,113 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

List<String> replicatedClusters = new ArrayList<>();
if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
getNamespaceReplicatedClusters(namespaceName)
.stream().filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
.forEach(replicatedClusters::add);
}
createLocalFuture.whenComplete((ignored, ex) -> {
if (ex != null) {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause());
if (ex.getCause() instanceof RestException) {
asyncResponse.resume(ex.getCause());
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
}
return;
}
protected CompletableFuture<Set<String>> getNamespaceReplicatedClustersAsync(NamespaceName namespaceName) {
return namespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
if (policies.isPresent()) {
return policies.get().replication_clusters;
} else {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
});
}

if (!replicatedClusters.isEmpty()) {
replicatedClusters.forEach(cluster -> {
pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
.thenAccept(clusterDataOp -> {
((TopicsImpl) pulsar().getBrokerService()
.getClusterPulsarAdmin(cluster, clusterDataOp).topics())
.createPartitionedTopicAsync(
topicName.getPartitionedTopicName(), numPartitions, true, null);
})
.exceptionally(throwable -> {
log.error("Failed to create partition topic in cluster {}.", cluster, throwable);
return null;
});
private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) {
getNamespaceReplicatedClustersAsync(namespaceName)
.thenAccept(clusters -> {
// this call happens in the background without async composition. completion is logged.
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions);
});
}
}

log.info("[{}] Successfully created partitions for topic {} in cluster {}",
clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
asyncResponse.resume(Response.noContent().build());
});
protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground(
Set<String> clusters, int numPartitions) {
final String shortTopicName = topicName.getPartitionedTopicName();
Map<String, CompletableFuture<Void>> tasksForAllClusters = new HashMap<>();
for (String cluster : clusters) {
if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
continue;
}
ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources();
CompletableFuture<Void> createRemoteTopicFuture = new CompletableFuture<>();
tasksForAllClusters.put(cluster, createRemoteTopicFuture);
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
if (ex1 != null) {
// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck.
log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster"
+ " {}.", clientAppId(), topicName, cluster, ex1);
createRemoteTopicFuture.completeExceptionally(new RestException(ex1));
return;
}
PulsarAdmin remotePulsarAdmin;
try {
remotePulsarAdmin = pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData);
} catch (Exception ex) {
log.error("[{}] [{}] An un-expected error occurs when trying to create remote pulsar admin for"
+ " cluster {}", clientAppId(), topicName, cluster, ex);
createRemoteTopicFuture.completeExceptionally(new RestException(ex));
return;
}
// Get cluster data success.
TopicsImpl topics = (TopicsImpl) remotePulsarAdmin.topics();
topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null)
.whenComplete((ignore, ex2) -> {
if (ex2 == null) {
// Create success.
log.info("[{}] Successfully created partitioned topic {} in cluster {}",
clientAppId(), topicName, cluster);
createRemoteTopicFuture.complete(null);
return;
}
// Create topic on the remote cluster error.
Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2);
// The topic has been created before, check the partitions count is expected.
if (unwrapEx2 instanceof PulsarAdminException.ConflictException) {
topics.getPartitionedTopicMetadataAsync(shortTopicName)
.whenComplete((topicMeta, ex3) -> {
if (ex3 != null) {
// Unexpected error, such as NPE. Catch all error to avoid the
// "createRemoteTopicFuture" stuck.
log.error(
"[{}] Failed to check remote-cluster's topic metadata when "
+ "creating"
+ " partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex3);
createRemoteTopicFuture.completeExceptionally(new RestException(ex3));
}
// Call get partitioned metadata of remote cluster success.
if (topicMeta.partitions == numPartitions) {
log.info(
"[{}] Skip created partitioned topic {} in cluster {}, "
+ "because that {}",
clientAppId(), topicName, cluster, unwrapEx2.getMessage());
createRemoteTopicFuture.complete(null);
} else {
String errorMsg =
String.format("[%s] There is an exists topic %s with different"
+ " partitions %s on the remote cluster %s, "
+ "you want to create it"
+ " with partitions %s",
clientAppId(), shortTopicName, topicMeta.partitions,
cluster,
numPartitions);
log.error(errorMsg);
createRemoteTopicFuture.completeExceptionally(
new RestException(Status.PRECONDITION_FAILED, errorMsg));
}
});
} else {
// An HTTP error was responded from the remote cluster.
log.error("[{}] Failed to create partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex2);
createRemoteTopicFuture.completeExceptionally(new RestException(unwrapEx2));
}
});
});
}
return tasksForAllClusters;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand All @@ -39,6 +42,8 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -257,4 +262,29 @@ public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationL
});
}
}

@Test(timeOut = 30 * 1000)
public void testCreateRemoteAdminFailed() throws Exception {
final TenantInfo tenantInfo = admin1.tenants().getTenantInfo(defaultTenant);
final String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", "");
final String randomClusterName = "c_" + UUID.randomUUID().toString().replace("-", "");
final String topic = BrokerTestUtil.newUniqueName(ns1 + "/tp");
admin1.namespaces().createNamespace(ns1);
admin1.topics().createPartitionedTopic(topic, 2);

// Inject a wrong cluster data which with empty fields.
ClusterResources clusterResources = broker1.getPulsar().getPulsarResources().getClusterResources();
clusterResources.createCluster(randomClusterName, ClusterData.builder().build());
Set<String> allowedClusters = new HashSet<>(tenantInfo.getAllowedClusters());
allowedClusters.add(randomClusterName);
admin1.tenants().updateTenant(defaultTenant, TenantInfo.builder().adminRoles(tenantInfo.getAdminRoles())
.allowedClusters(allowedClusters).build());

// Verify.
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, randomClusterName));

// cleanup.
admin1.topics().deletePartitionedTopic(topic);
admin1.tenants().updateTenant(defaultTenant, tenantInfo);
}
}

0 comments on commit 85fd190

Please sign in to comment.