From 3cd07b4a04ae5e207e4970a52f9734cd824cd40c Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Sat, 14 Oct 2023 05:35:36 -0700 Subject: [PATCH] [feat] [broker] PIP-188 Add support to auto create topic resources into green cluster before migration [part-3] (#21354) --- conf/broker.conf | 4 + conf/standalone.conf | 8 ++ .../pulsar/broker/ServiceConfiguration.java | 7 + .../service/persistent/PersistentTopic.java | 123 ++++++++++++++++-- .../broker/service/ClusterMigrationTest.java | 88 ++++++++++++- .../common/policies/data/ClusterData.java | 5 +- .../apache/pulsar/admin/cli/CmdClusters.java | 9 +- .../policies/data/ClusterDataImplTest.java | 3 +- 8 files changed, 232 insertions(+), 15 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 4ad8536fd8d68..ca407810a42fc 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1593,6 +1593,10 @@ aggregatePublisherStatsByProducerName=false # if cluster is marked migrated. Disable with value 0. (Default disabled). clusterMigrationCheckDurationSeconds=0 +# Flag to start cluster migration for topic only after creating all topic's resources +# such as tenant, namespaces, subscriptions at new green cluster. (Default disabled). +clusterMigrationAutoResourceCreation=false + ### --- Schema storage --- ### # The schema storage implementation used by this broker schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory diff --git a/conf/standalone.conf b/conf/standalone.conf index 76223c5933e45..43455966c978f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -999,6 +999,14 @@ splitTopicAndPartitionLabelInPrometheus=false # Otherwise, aggregate it by list index. aggregatePublisherStatsByProducerName=false +# Interval between checks to see if cluster is migrated and marks topic migrated +# if cluster is marked migrated. Disable with value 0. (Default disabled). +clusterMigrationCheckDurationSeconds=0 + +# Flag to start cluster migration for topic only after creating all topic's resources +# such as tenant, namespaces, subscriptions at new green cluster. (Default disabled). +clusterMigrationAutoResourceCreation=false + ### --- Schema storage --- ### # The schema storage implementation used by this broker. schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c93b683255002..82ddedd89ac0b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2778,6 +2778,13 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, ) private int clusterMigrationCheckDurationSeconds = 0; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Flag to start cluster migration for topic only after creating all topic's resources" + + " such as tenant, namespaces, subscriptions at new green cluster. (Default disabled)." + ) + private boolean clusterMigrationAutoResourceCreation = false; + @FieldContext( category = CATEGORY_SCHEMA, doc = "Enforce schema validation on following cases:\n\n" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index dfeb03a254698..1729ca878e7da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -77,6 +77,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl; +import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.net.BookieId; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -103,6 +104,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException; +import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedSubscriptionException; import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException; @@ -125,6 +127,8 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.BatchMessageIdImpl; @@ -137,10 +141,12 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.TxnAction; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats; @@ -206,6 +212,9 @@ public static boolean isDedupCursorName(String name) { private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60; + private static final String MIGRATION_CLUSTER_NAME = "migration-cluster"; + private volatile boolean migrationSubsCreated = false; + // topic has every published chunked message since topic is loaded public boolean msgChunkPublished; @@ -2582,16 +2591,110 @@ public CompletableFuture checkClusterMigration() { if (!clusterUrl.isPresent()) { return CompletableFuture.completedFuture(null); } - CompletableFuture migrated = !isMigrated() ? ledger.asyncMigrate() : - CompletableFuture.completedFuture(null); - return migrated.thenApply(__ -> { - subscriptions.forEach((name, sub) -> { - if (sub.isSubsciptionMigrated()) { - sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration); - } - }); - return null; - }).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__ -> checkAndUnsubscribeSubscriptions()); + return initMigration().thenCompose(subCreated -> { + migrationSubsCreated = true; + CompletableFuture migrated = !isMigrated() ? ledger.asyncMigrate() + : CompletableFuture.completedFuture(null); + return migrated.thenApply(__ -> { + subscriptions.forEach((name, sub) -> { + if (sub.isSubsciptionMigrated()) { + sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration); + } + }); + return null; + }).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__ -> checkAndUnsubscribeSubscriptions()); + }); + } + + /** + * Initialize migration for a topic by creating topic's resources at migration cluster. + */ + private CompletableFuture initMigration() { + if (migrationSubsCreated) { + return CompletableFuture.completedFuture(null); + } + log.info("{} initializing subscription created at migration cluster", topic); + return getMigratedClusterUrlAsync(getBrokerService().getPulsar()).thenCompose(clusterUrl -> { + if (!brokerService.getPulsar().getConfig().isClusterMigrationAutoResourceCreation()) { + return CompletableFuture.completedFuture(null); + } + if (!clusterUrl.isPresent()) { + return FutureUtil + .failedFuture(new TopicMigratedException("cluster migration service-url is not configired")); + } + ClusterUrl url = clusterUrl.get(); + ClusterData clusterData = ClusterData.builder().serviceUrl(url.getServiceUrl()) + .serviceUrlTls(url.getServiceUrlTls()).brokerServiceUrl(url.getBrokerServiceUrl()) + .brokerServiceUrlTls(url.getBrokerServiceUrlTls()).build(); + PulsarAdmin admin = getBrokerService().getClusterPulsarAdmin(MIGRATION_CLUSTER_NAME, + Optional.of(clusterData)); + + // namespace creation + final String tenant = TopicName.get(topic).getTenant(); + final NamespaceName ns = TopicName.get(topic).getNamespaceObject(); + List> subResults = new ArrayList<>(); + + return brokerService.getPulsar().getPulsarResources().getTenantResources().getTenantAsync(tenant) + .thenCompose(tenantInfo -> { + if (!tenantInfo.isPresent()) { + return CompletableFuture.completedFuture(null); + } + CompletableFuture ts = new CompletableFuture<>(); + admin.tenants().createTenantAsync(tenant, tenantInfo.get()).handle((__, ex) -> { + if (ex == null || ex instanceof ConflictException) { + log.info("[{}] successfully created tenant {} for migration", topic, tenant); + ts.complete(null); + return null; + } + log.warn("[{}] Failed to create tenant {} on migration cluster {}", topic, tenant, + ex.getCause().getMessage()); + ts.completeExceptionally(ex.getCause()); + return null; + }); + return ts; + }).thenCompose(t -> { + return brokerService.getPulsar().getPulsarResources().getNamespaceResources() + .getPoliciesAsync(ns).thenCompose(policies -> { + if (!policies.isPresent()) { + return CompletableFuture.completedFuture(null); + } + CompletableFuture nsFuture = new CompletableFuture<>(); + admin.namespaces().createNamespaceAsync(ns.toString(), policies.get()) + .handle((__, ex) -> { + if (ex == null || ex instanceof ConflictException) { + log.info("[{}] successfully created namespace {} for migration", + topic, ns); + nsFuture.complete(null); + return null; + } + log.warn("[{}] Failed to create namespace {} on migration cluster {}", + topic, ns, ex.getCause().getMessage()); + nsFuture.completeExceptionally(ex.getCause()); + return null; + }); + return nsFuture; + }).thenCompose(p -> { + subscriptions.forEach((subName, sub) -> { + CompletableFuture subResult = new CompletableFuture<>(); + subResults.add(subResult); + admin.topics().createSubscriptionAsync(topic, subName, MessageId.earliest) + .handle((__, ex) -> { + if (ex == null || ex instanceof ConflictException) { + log.info("[{}] successfully created sub {} for migration", + topic, subName); + subResult.complete(null); + return null; + } + log.warn("[{}] Failed to create sub {} on migration cluster, {}", + topic, subName, ex.getCause().getMessage()); + subResult.completeExceptionally(ex.getCause()); + return null; + }); + }); + return Futures.waitForAll(subResults); + }); + }); + }); } private CompletableFuture checkAndUnsubscribeSubscriptions() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index 248d87c631c4c..ec1cb7e4fc9e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -28,15 +28,18 @@ import java.lang.reflect.Method; import java.net.URL; +import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -267,7 +270,8 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get(); assertFalse(topic2.getProducers().isEmpty()); - ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), + pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); admin1.clusters().updateClusterMigration("r1", true, migratedUrl); retryStrategically((test) -> { @@ -454,7 +458,8 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc retryStrategically((test) -> topic2.getReplicators().size() == 1, 10, 2000); log.info("replicators should be ready"); - ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), + pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); admin1.clusters().updateClusterMigration("r1", true, migratedUrl); log.info("update cluster migration called"); retryStrategically((test) -> { @@ -494,6 +499,85 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc assertEquals(topic2.getProducers().size(), 2); } + /** + * This test validates that blue cluster first creates list of subscriptions into green cluster so, green cluster + * will not lose the data if producer migrates. + * + * @throws Exception + */ + @Test + public void testClusterMigrationWithResourceCreated() throws Exception { + log.info("--- Starting testClusterMigrationWithResourceCreated ---"); + + String tenant = "pulsar2"; + String namespace = tenant + "/migration"; + String greenClusterName = pulsar2.getConfig().getClusterName(); + String blueClusterName = pulsar1.getConfig().getClusterName(); + admin1.clusters().createCluster(greenClusterName, + ClusterData.builder().serviceUrl(url2.toString()).serviceUrlTls(urlTls2.toString()) + .brokerServiceUrl(pulsar2.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls()).build()); + admin2.clusters().createCluster(blueClusterName, + ClusterData.builder().serviceUrl(url1.toString()).serviceUrlTls(urlTls1.toString()) + .brokerServiceUrl(pulsar1.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()).build()); + + admin1.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), + Sets.newHashSet("r1", greenClusterName))); + // broker should handle already tenant creation + admin2.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), + Sets.newHashSet("r1", greenClusterName))); + admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1", greenClusterName)); + + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/migrationTopic"); + + broker1.getPulsarService().getConfig().setClusterMigrationAutoResourceCreation(true); + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + // cluster-1 producer/consumer + Producer producer1 = client1.newProducer().topic(topicName).enableBatching(false) + .producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + // create subscriptions + admin1.topics().createSubscription(topicName, "s1", MessageId.earliest); + admin1.topics().createSubscription(topicName, "s2", MessageId.earliest); + + ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), + pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + admin1.clusters().updateClusterMigration("r1", true, migratedUrl); + + PersistentTopic topic1 = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null) + .get(); + retryStrategically((test) -> { + try { + topic1.checkClusterMigration().get(); + return true; + } catch (Exception e) { + // ok + } + return false; + }, 10, 500); + + assertNotNull(admin2.tenants().getTenantInfo(tenant)); + assertNotNull(admin2.namespaces().getPolicies(namespace)); + List subLists = admin2.topics().getSubscriptions(topicName); + assertTrue(subLists.contains("s1")); + assertTrue(subLists.contains("s2")); + + int n = 5; + for (int i = 0; i < n; i++) { + producer1.send("test1".getBytes()); + } + + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); + for (int i = 0; i < n; i++) { + assertNotNull(consumer1.receive()); + } + + consumer1.close(); + producer1.close(); + } + static class TestBroker extends MockedPulsarServiceBaseTest { private String clusterName; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java index 212a1575f9934..0b3e5aa49cb83 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java @@ -136,11 +136,14 @@ static Builder builder() { @NoArgsConstructor @AllArgsConstructor class ClusterUrl { + String serviceUrl; + String serviceUrlTls; String brokerServiceUrl; String brokerServiceUrlTls; public boolean isEmpty() { - return brokerServiceUrl == null && brokerServiceUrlTls == null; + return serviceUrl != null && serviceUrlTls != null && brokerServiceUrl == null + && brokerServiceUrlTls == null; } } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java index 1653de93a738c..c578876b382fb 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java @@ -162,6 +162,13 @@ private class UpdateClusterMigration extends CliCommand { @Parameter(names = "--migrated", description = "Is cluster migrated", required = true) private boolean migrated; + @Parameter(names = "--service-url", description = "New migrated cluster service url", required = false) + private String serviceUrl; + + @Parameter(names = "--service-url-secure", + description = "New migrated cluster service url secure", required = false) + private String serviceUrlTls; + @Parameter(names = "--broker-url", description = "New migrated cluster broker service url", required = false) private String brokerServiceUrl; @@ -171,7 +178,7 @@ private class UpdateClusterMigration extends CliCommand { void run() throws PulsarAdminException { String cluster = getOneArgument(params); - ClusterUrl clusterUrl = new ClusterUrl(brokerServiceUrl, brokerServiceUrlTls); + ClusterUrl clusterUrl = new ClusterUrl(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls); getAdmin().clusters().updateClusterMigration(cluster, migrated, clusterUrl); } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java index ca4cba2cf9749..87e935ecf7360 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java @@ -54,7 +54,8 @@ public void verifyClone() { .brokerClientCertificateFilePath("/my/cert/file") .listenerName("a-listener") .migrated(true) - .migratedClusterUrl(new ClusterData.ClusterUrl("pulsar://remote", "pulsar+ssl://remote")) + .migratedClusterUrl(new ClusterData.ClusterUrl("http://remote", "https://remote", "pulsar://remote", + "pulsar+ssl://remote")) .build(); ClusterDataImpl clone = originalData.clone().build();