diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index af21951851007..10d2a14612066 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.errorprone.annotations.CanIgnoreReturnValue; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -37,9 +38,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; @@ -577,6 +580,17 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly, properties) .thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions)) + .thenRun(() -> { + if (!createLocalTopicOnly && topicName.isGlobal()) { + internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions); + log.info("[{}] Successfully created partitioned for topic {} for the remote clusters", + clientAppId()); + } else { + log.info("[{}] Skip creating partitioned for topic {} for the remote clusters", + clientAppId(), topicName); + } + asyncResponse.resume(Response.noContent().build()); + }) .whenComplete((ignored, ex) -> { if (ex != null) { createLocalFuture.completeExceptionally(ex); @@ -589,44 +603,113 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); + } - List replicatedClusters = new ArrayList<>(); - if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) { - getNamespaceReplicatedClusters(namespaceName) - .stream().filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName())) - .forEach(replicatedClusters::add); - } - createLocalFuture.whenComplete((ignored, ex) -> { - if (ex != null) { - log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause()); - if (ex.getCause() instanceof RestException) { - asyncResponse.resume(ex.getCause()); - } else { - resumeAsyncResponseExceptionally(asyncResponse, ex.getCause()); - } - return; - } + protected CompletableFuture> getNamespaceReplicatedClustersAsync(NamespaceName namespaceName) { + return namespaceResources().getPoliciesAsync(namespaceName) + .thenApply(policies -> { + if (policies.isPresent()) { + return policies.get().replication_clusters; + } else { + throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } + }); + } - if (!replicatedClusters.isEmpty()) { - replicatedClusters.forEach(cluster -> { - pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster) - .thenAccept(clusterDataOp -> { - ((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), numPartitions, true, null); - }) - .exceptionally(throwable -> { - log.error("Failed to create partition topic in cluster {}.", cluster, throwable); - return null; - }); + private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { + getNamespaceReplicatedClustersAsync(namespaceName) + .thenAccept(clusters -> { + // this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); }); - } + } - log.info("[{}] Successfully created partitions for topic {} in cluster {}", - clientAppId(), topicName, pulsar().getConfiguration().getClusterName()); - asyncResponse.resume(Response.noContent().build()); - }); + protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( + Set clusters, int numPartitions) { + final String shortTopicName = topicName.getPartitionedTopicName(); + Map> tasksForAllClusters = new HashMap<>(); + for (String cluster : clusters) { + if (cluster.equals(pulsar().getConfiguration().getClusterName())) { + continue; + } + ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); + CompletableFuture createRemoteTopicFuture = new CompletableFuture<>(); + tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { + if (ex1 != null) { + // Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. + log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" + + " {}.", clientAppId(), topicName, cluster, ex1); + createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); + return; + } + PulsarAdmin remotePulsarAdmin; + try { + remotePulsarAdmin = pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData); + } catch (Exception ex) { + log.error("[{}] [{}] An un-expected error occurs when trying to create remote pulsar admin for" + + " cluster {}", clientAppId(), topicName, cluster, ex); + createRemoteTopicFuture.completeExceptionally(new RestException(ex)); + return; + } + // Get cluster data success. + TopicsImpl topics = (TopicsImpl) remotePulsarAdmin.topics(); + topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) + .whenComplete((ignore, ex2) -> { + if (ex2 == null) { + // Create success. + log.info("[{}] Successfully created partitioned topic {} in cluster {}", + clientAppId(), topicName, cluster); + createRemoteTopicFuture.complete(null); + return; + } + // Create topic on the remote cluster error. + Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); + // The topic has been created before, check the partitions count is expected. + if (unwrapEx2 instanceof PulsarAdminException.ConflictException) { + topics.getPartitionedTopicMetadataAsync(shortTopicName) + .whenComplete((topicMeta, ex3) -> { + if (ex3 != null) { + // Unexpected error, such as NPE. Catch all error to avoid the + // "createRemoteTopicFuture" stuck. + log.error( + "[{}] Failed to check remote-cluster's topic metadata when " + + "creating" + + " partitioned topic {} in cluster {}.", + clientAppId(), topicName, cluster, ex3); + createRemoteTopicFuture.completeExceptionally(new RestException(ex3)); + } + // Call get partitioned metadata of remote cluster success. + if (topicMeta.partitions == numPartitions) { + log.info( + "[{}] Skip created partitioned topic {} in cluster {}, " + + "because that {}", + clientAppId(), topicName, cluster, unwrapEx2.getMessage()); + createRemoteTopicFuture.complete(null); + } else { + String errorMsg = + String.format("[%s] There is an exists topic %s with different" + + " partitions %s on the remote cluster %s, " + + "you want to create it" + + " with partitions %s", + clientAppId(), shortTopicName, topicMeta.partitions, + cluster, + numPartitions); + log.error(errorMsg); + createRemoteTopicFuture.completeExceptionally( + new RestException(Status.PRECONDITION_FAILED, errorMsg)); + } + }); + } else { + // An HTTP error was responded from the remote cluster. + log.error("[{}] Failed to create partitioned topic {} in cluster {}.", + clientAppId(), topicName, cluster, ex2); + createRemoteTopicFuture.completeExceptionally(new RestException(unwrapEx2)); + } + }); + }); + } + return tasksForAllClusters; } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index c1cf2f85f0970..8700faac6f6e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -24,11 +24,14 @@ import java.lang.reflect.Field; import java.time.Duration; import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -39,6 +42,8 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; @@ -257,4 +262,29 @@ public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationL }); } } + + @Test(timeOut = 30 * 1000) + public void testCreateRemoteAdminFailed() throws Exception { + final TenantInfo tenantInfo = admin1.tenants().getTenantInfo(defaultTenant); + final String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", ""); + final String randomClusterName = "c_" + UUID.randomUUID().toString().replace("-", ""); + final String topic = BrokerTestUtil.newUniqueName(ns1 + "/tp"); + admin1.namespaces().createNamespace(ns1); + admin1.topics().createPartitionedTopic(topic, 2); + + // Inject a wrong cluster data which with empty fields. + ClusterResources clusterResources = broker1.getPulsar().getPulsarResources().getClusterResources(); + clusterResources.createCluster(randomClusterName, ClusterData.builder().build()); + Set allowedClusters = new HashSet<>(tenantInfo.getAllowedClusters()); + allowedClusters.add(randomClusterName); + admin1.tenants().updateTenant(defaultTenant, TenantInfo.builder().adminRoles(tenantInfo.getAdminRoles()) + .allowedClusters(allowedClusters).build()); + + // Verify. + admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, randomClusterName)); + + // cleanup. + admin1.topics().deletePartitionedTopic(topic); + admin1.tenants().updateTenant(defaultTenant, tenantInfo); + } }