diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 8ab1f4dc86002..a8f1af1d34f91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2642,4 +2642,18 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu return null; }); } + + protected void internalEnableMigration(boolean migrated) { + validateSuperUserAccess(); + try { + updatePolicies(namespaceName, policies -> { + policies.isMigrated = migrated; + return policies; + }); + log.info("Successfully updated migration on namespace {}", namespaceName); + } catch (Exception e) { + log.error("Failed to update migration on namespace {}", namespaceName, e); + throw new RestException(e); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 234d77251139b..b188750252ab3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1709,5 +1709,18 @@ public void setSchemaAutoUpdateCompatibilityStrategy(@PathParam("tenant") String internalSetSchemaAutoUpdateCompatibilityStrategy(strategy); } + @POST + @Path("/{property}/{cluster}/{namespace}/migration") + @ApiOperation(hidden = true, value = "Update migration for all topics in a namespace") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) + public void enableMigration(@PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + boolean migrated) { + validateNamespaceName(property, cluster, namespace); + internalEnableMigration(migrated); + } + private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index dfa040baec5cf..36df0f7e31a34 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -2779,7 +2779,17 @@ public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse, }); } - + @POST + @Path("/{tenant}/{namespace}/migration") + @ApiOperation(hidden = true, value = "Update migration for all topics in a namespace") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) + public void enableMigration(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + boolean migrated) { + validateNamespaceName(tenant, namespace); + internalEnableMigration(migrated); + } private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 31e37d0f176d7..3cb396d7a4b41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1349,19 +1349,29 @@ public void updateBrokerSubscribeRate() { } public Optional getMigratedClusterUrl() { - return getMigratedClusterUrl(brokerService.getPulsar()); + return getMigratedClusterUrl(brokerService.getPulsar(), topic); } - public static CompletableFuture> getMigratedClusterUrlAsync(PulsarService pulsar) { + public static CompletableFuture> getMigratedClusterUrlAsync(PulsarService pulsar, + String topic) { return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName()) - .thenApply(clusterData -> (clusterData.isPresent() && clusterData.get().isMigrated()) + .thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic), + ((clusterData, isNamespaceMigrationEnabled) + -> ((clusterData.isPresent() && clusterData.get().isMigrated()) + || isNamespaceMigrationEnabled) ? Optional.ofNullable(clusterData.get().getMigratedClusterUrl()) - : Optional.empty()); + : Optional.empty())); } - public static Optional getMigratedClusterUrl(PulsarService pulsar) { + private static CompletableFuture isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) { + return pulsar.getPulsarResources().getNamespaceResources(). + getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) + .thenApply(policies -> policies.isPresent() && policies.get().isMigrated); + } + + public static Optional getMigratedClusterUrl(PulsarService pulsar, String topic) { try { - return getMigratedClusterUrlAsync(pulsar) + return getMigratedClusterUrlAsync(pulsar, topic) .get(pulsar.getPulsarResources().getClusterResources().getOperationTimeoutSec(), TimeUnit.SECONDS); } catch (Exception e) { log.warn("Failed to get migration cluster URL", e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 0ca53eac2f4d7..68678efc29637 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -821,7 +821,8 @@ public void topicMigrated(Optional clusterUrl) { public boolean checkAndApplyTopicMigration() { if (subscription.isSubsciptionMigrated()) { - Optional clusterUrl = AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar()); + Optional clusterUrl = AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar(), + topicName); if (clusterUrl.isPresent()) { ClusterUrl url = clusterUrl.get(); cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 34542d56938b0..18bf62f1ef34a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1644,7 +1644,7 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ producers.remove(producerId, producerFuture); }).exceptionallyAsync(ex -> { if (ex.getCause() instanceof BrokerServiceException.TopicMigratedException) { - Optional clusterURL = getMigratedClusterUrl(service.getPulsar()); + Optional clusterURL = getMigratedClusterUrl(service.getPulsar(), topic.getName()); if (clusterURL.isPresent()) { if (topic.isReplicationBacklogExist()) { log.info("Topic {} is migrated but replication backlog exist: " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 639d2cfc5810f..54811da723808 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -156,11 +156,8 @@ public NonPersistentTopic(String topic, BrokerService brokerService) { } private CompletableFuture updateClusterMigrated() { - return getMigratedClusterUrlAsync(brokerService.getPulsar()).thenAccept(url -> migrated = url.isPresent()); - } - - private Optional getClusterMigrationUrl() { - return getMigratedClusterUrl(brokerService.getPulsar()); + return getMigratedClusterUrlAsync(brokerService.getPulsar(), topic) + .thenAccept(url -> migrated = url.isPresent()); } public CompletableFuture initialize() { @@ -332,7 +329,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St false, cnx, cnx.getAuthRole(), metadata, readCompacted, keySharedMeta, MessageId.latest, DEFAULT_CONSUMER_EPOCH, schemaType); if (isMigrated()) { - consumer.topicMigrated(getClusterMigrationUrl()); + consumer.topicMigrated(getMigratedClusterUrl()); } addConsumerToSubscription(subscription, consumer).thenRun(() -> { @@ -949,7 +946,7 @@ public boolean isActive() { @Override public CompletableFuture checkClusterMigration() { - Optional url = getClusterMigrationUrl(); + Optional url = getMigratedClusterUrl(); if (url.isPresent()) { this.migrated = true; producers.forEach((__, producer) -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1729ca878e7da..1ed7bafc14724 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2588,6 +2588,7 @@ private boolean hasBacklogs() { @Override public CompletableFuture checkClusterMigration() { Optional clusterUrl = getMigratedClusterUrl(); + if (!clusterUrl.isPresent()) { return CompletableFuture.completedFuture(null); } @@ -2614,13 +2615,13 @@ private CompletableFuture initMigration() { return CompletableFuture.completedFuture(null); } log.info("{} initializing subscription created at migration cluster", topic); - return getMigratedClusterUrlAsync(getBrokerService().getPulsar()).thenCompose(clusterUrl -> { + return getMigratedClusterUrlAsync(getBrokerService().getPulsar(), topic).thenCompose(clusterUrl -> { if (!brokerService.getPulsar().getConfig().isClusterMigrationAutoResourceCreation()) { return CompletableFuture.completedFuture(null); } if (!clusterUrl.isPresent()) { return FutureUtil - .failedFuture(new TopicMigratedException("cluster migration service-url is not configired")); + .failedFuture(new TopicMigratedException("cluster migration service-url is not configured")); } ClusterUrl url = clusterUrl.get(); ClusterData clusterData = ClusterData.builder().serviceUrl(url.getServiceUrl()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index ec1cb7e4fc9e8..b1fd11ba0ac42 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -66,6 +66,7 @@ public class ClusterMigrationTest { protected String methodName; String namespace = "pulsar/migrationNs"; + String namespaceNotToMigrate = "pulsar/notToMigrateNs"; TestBroker broker1, broker2, broker3, broker4; URL url1; @@ -104,6 +105,24 @@ public Object[][] subscriptionTypes() { }; } + @DataProvider(name="NamespaceMigrationTopicSubscriptionTypes") + public Object[][] namespaceMigrationSubscriptionTypes() { + return new Object[][] { + {true, SubscriptionType.Shared, true, false}, + {true, SubscriptionType.Key_Shared, true, false}, + {true, SubscriptionType.Shared, false, true}, + {true, SubscriptionType.Key_Shared, false, true}, + {true, SubscriptionType.Shared, true, true}, + {true, SubscriptionType.Key_Shared, true, true}, + {false, SubscriptionType.Shared, true, false}, + {false, SubscriptionType.Key_Shared, true, false}, + {false, SubscriptionType.Shared, false, true}, + {false, SubscriptionType.Key_Shared,false, true}, + {false, SubscriptionType.Shared, true, true}, + {false, SubscriptionType.Key_Shared,true, true}, + }; + } + @BeforeMethod(alwaysRun = true, timeOut = 300000) public void setup() throws Exception { @@ -179,6 +198,9 @@ public void setup() throws Exception { admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1", "r3")); admin3.namespaces().createNamespace(namespace); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r3")); + admin1.namespaces().createNamespace(namespaceNotToMigrate, Sets.newHashSet("r1", "r3")); + admin3.namespaces().createNamespace(namespaceNotToMigrate); + admin1.namespaces().setNamespaceReplicationClusters(namespaceNotToMigrate, Sets.newHashSet("r1", "r3")); // Setting r4 as replication cluster for r2 admin2.tenants().createTenant("pulsar", @@ -188,6 +210,9 @@ public void setup() throws Exception { admin2.namespaces().createNamespace(namespace, Sets.newHashSet("r2", "r4")); admin4.namespaces().createNamespace(namespace); admin2.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r2", "r4")); + admin2.namespaces().createNamespace(namespaceNotToMigrate, Sets.newHashSet("r2", "r4")); + admin4.namespaces().createNamespace(namespaceNotToMigrate); + admin2.namespaces().setNamespaceReplicationClusters(namespaceNotToMigrate, Sets.newHashSet("r2", "r4")); assertEquals(admin1.clusters().getCluster("r1").getServiceUrl(), url1.toString()); assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString()); @@ -458,10 +483,12 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc retryStrategically((test) -> topic2.getReplicators().size() == 1, 10, 2000); log.info("replicators should be ready"); + ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); admin1.clusters().updateClusterMigration("r1", true, migratedUrl); log.info("update cluster migration called"); + retryStrategically((test) -> { try { topic1.checkClusterMigration().get(); @@ -578,6 +605,434 @@ public void testClusterMigrationWithResourceCreated() throws Exception { producer1.close(); } + @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes") + public void testNamespaceMigration(boolean persistent, SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception { + log.info("--- Starting Test::testNamespaceMigration ---"); + // topic for the namespace1 (to be migrated) + final String topicName = BrokerTestUtil + .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + // topic for namespace2 (not to be migrated) + final String topicName2 = BrokerTestUtil + .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespaceNotToMigrate + "/migrationTopic"); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + // blue cluster - namespace1 - producer/consumer + Producer blueProducerNs1_1 = client1.newProducer().topic(topicName).enableBatching(false) + .producerName("blue-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + Consumer blueConsumerNs1_1 = client1.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("s1").subscribe(); + AbstractTopic blueTopicNs1_1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get(); + retryStrategically((test) -> !blueTopicNs1_1.getProducers().isEmpty(), 5, 500); + retryStrategically((test) -> !blueTopicNs1_1.getSubscriptions().isEmpty(), 5, 500); + assertFalse(blueTopicNs1_1.getProducers().isEmpty()); + assertFalse(blueTopicNs1_1.getSubscriptions().isEmpty()); + + // blue cluster - namespace2 - producer/consumer + Producer blueProducerNs2_1 = client1.newProducer().topic(topicName2).enableBatching(false) + .producerName("blue-producer-ns2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + Consumer blueConsumerNs2_1 = client1.newConsumer().topic(topicName2).subscriptionType(subType) + .subscriptionName("s1").subscribe(); + AbstractTopic blueTopicNs2_1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName2, false).getNow(null).get(); + retryStrategically((test) -> !blueTopicNs2_1.getProducers().isEmpty(), 5, 500); + retryStrategically((test) -> !blueTopicNs2_1.getSubscriptions().isEmpty(), 5, 500); + assertFalse(blueTopicNs2_1.getProducers().isEmpty()); + assertFalse(blueTopicNs2_1.getSubscriptions().isEmpty()); + + // build backlog on the blue cluster + blueConsumerNs1_1.close(); + blueConsumerNs2_1.close(); + int n = 5; + for (int i = 0; i < n; i++) { + blueProducerNs1_1.send("test1".getBytes()); + blueProducerNs2_1.send("test1".getBytes()); + } + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + // green cluster - namespace1 - producer/consumer + Producer greenProducerNs1_1 = client2.newProducer().topic(topicName).enableBatching(false) + .producerName("green-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + AbstractTopic greenTopicNs1_1 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get(); + assertFalse(greenTopicNs1_1.getProducers().isEmpty()); + + // green cluster - namespace2 - producer/consumer + Producer greenProducerNs2_1 = client2.newProducer().topic(topicName2).enableBatching(false) + .producerName("cluster2-nm1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + AbstractTopic greenTopicNs2_1 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName2, false).getNow(null).get(); + assertFalse(greenTopicNs2_1.getProducers().isEmpty()); + + // blue - green cluster migration + ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), + pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + admin1.clusters().updateClusterMigration("r1", isClusterMigrate, migratedUrl); + admin1.namespaces().updateMigrationState(namespace, isNamespaceMigrate); + + retryStrategically((test) -> { + try { + blueTopicNs1_1.checkClusterMigration().get(); + if (isClusterMigrate) { + blueTopicNs2_1.checkClusterMigration().get(); + } + return true; + } catch (Exception e) { + // ok + } + return false; + }, 10, 500); + + + blueTopicNs1_1.checkClusterMigration().get(); + if (isClusterMigrate) { + blueTopicNs2_1.checkClusterMigration().get(); + } + + log.info("before sending message"); + sleep(1000); + blueProducerNs1_1.sendAsync("test1".getBytes()); + blueProducerNs2_1.sendAsync("test1".getBytes()); + + // producer is disconnected from blue for namespace1 as cluster or namespace migration is enabled + retryStrategically((test) -> blueTopicNs1_1.getProducers().isEmpty(), 10, 500); + assertTrue(blueTopicNs1_1.getProducers().isEmpty()); + + if(isClusterMigrate){ + // producer is disconnected from blue for namespace2 if cluster migration is enabled + retryStrategically((test) -> blueTopicNs2_1.getProducers().isEmpty(), 10, 500); + assertTrue(blueTopicNs2_1.getProducers().isEmpty()); + } else { + // producer is not disconnected from blue for namespace2 if namespace migration is disabled + retryStrategically((test) -> !blueTopicNs2_1.getProducers().isEmpty(), 10, 500); + assertTrue(!blueTopicNs2_1.getProducers().isEmpty()); + } + + // create producer on blue which should be redirected to green + Producer blueProducerNs1_2 = client1.newProducer().topic(topicName).enableBatching(false) + .producerName("blue-producer-ns1-2").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + // producer is connected with green + retryStrategically((test) -> greenTopicNs1_1.getProducers().size() == 3, 10, 500); + assertTrue(greenTopicNs1_1.getProducers().size() == 3); + + // blueProducerNs2_1 should be migrated to green if the cluster migration is enabled + // should not be migrated if the namespace migration is disabled for namespace2 + if (isClusterMigrate) { + retryStrategically((test) -> greenTopicNs2_1.getProducers().size() == 2, 10, 500); + assertTrue(greenTopicNs2_1.getProducers().size() == 2); + } else{ + retryStrategically((test) -> greenTopicNs2_1.getProducers().size() == 1, 10, 500); + assertTrue(greenTopicNs2_1.getProducers().size() == 1); + } + + // try to consume backlog messages from cluster-1 + blueConsumerNs1_1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); + blueConsumerNs2_1 = client1.newConsumer().topic(topicName2).subscriptionName("s1").subscribe(); + if (persistent) { + for (int i = 0; i < n; i++) { + Message msg = blueConsumerNs1_1.receive(); + assertEquals(msg.getData(), "test1".getBytes()); + blueConsumerNs1_1.acknowledge(msg); + + Message msg2 = blueConsumerNs2_1.receive(); + assertEquals(msg2.getData(), "test1".getBytes()); + blueConsumerNs2_1.acknowledge(msg2); + } + } + // after consuming all messages, consumer should have disconnected + // from blue and reconnect with green + retryStrategically((test) -> !greenTopicNs1_1.getSubscriptions().isEmpty(), 10, 500); + assertFalse(greenTopicNs1_1.getSubscriptions().isEmpty()); + if (isClusterMigrate) { + retryStrategically((test) -> !greenTopicNs2_1.getSubscriptions().isEmpty(), 10, 500); + assertFalse(greenTopicNs2_1.getSubscriptions().isEmpty()); + } else { + retryStrategically((test) -> greenTopicNs2_1.getSubscriptions().isEmpty(), 10, 500); + assertTrue(greenTopicNs2_1.getSubscriptions().isEmpty()); + } + + blueTopicNs1_1.checkClusterMigration().get(); + if (isClusterMigrate) { + blueTopicNs2_1.checkClusterMigration().get(); + } + + ConcurrentOpenHashMap replicators = blueTopicNs1_1.getReplicators(); + replicators.forEach((r, replicator) -> { + assertFalse(replicator.isConnected()); + }); + assertTrue(blueTopicNs1_1.getSubscriptions().isEmpty()); + + if (isClusterMigrate) { + ConcurrentOpenHashMap replicatorsNm = blueTopicNs2_1.getReplicators(); + replicatorsNm.forEach((r, replicator) -> { + assertFalse(replicator.isConnected()); + }); + assertTrue(blueTopicNs2_1.getSubscriptions().isEmpty()); + } else { + ConcurrentOpenHashMap replicatorsNm = blueTopicNs2_1.getReplicators(); + replicatorsNm.forEach((r, replicator) -> { + assertTrue(replicator.isConnected()); + }); + assertFalse(blueTopicNs2_1.getSubscriptions().isEmpty()); + } + + // create a new consumer on blue which should also reconnect to green + Consumer blueConsumerNs1_2 = client1.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("s2").subscribe(); + Consumer blueConsumerNs2_2 = client1.newConsumer().topic(topicName2).subscriptionType(subType) + .subscriptionName("s2").subscribe(); + retryStrategically((test) -> greenTopicNs1_1.getSubscription("s2") != null, 10, 500); + assertFalse(greenTopicNs1_1.getSubscription("s2").getConsumers().isEmpty()); + if (isClusterMigrate) { + retryStrategically((test) -> greenTopicNs2_1.getSubscription("s2") != null, 10, 500); + assertFalse(greenTopicNs2_1.getSubscription("s2").getConsumers().isEmpty()); + } else { + retryStrategically((test) -> greenTopicNs2_1.getSubscription("s2") == null, 10, 500); + } + + // new sub on migration topic must be redirected immediately + Consumer consumerM = client1.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("sM").subscribe(); + assertFalse(pulsar2.getBrokerService().getTopicReference(topicName).get().getSubscription("sM").getConsumers() + .isEmpty()); + consumerM.close(); + + // migrate topic after creating subscription + String newTopicName = topicName + "-new"; + consumerM = client1.newConsumer().topic(newTopicName).subscriptionType(subType) + .subscriptionName("sM").subscribe(); + retryStrategically((t) -> pulsar1.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100); + pulsar1.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get(); + retryStrategically((t) -> + pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent() && + pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM") + .getConsumers().isEmpty(), 5, 100); + assertFalse(pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM").getConsumers() + .isEmpty()); + consumerM.close(); + + // publish messages to cluster-2 and consume them + for (int i = 0; i < n; i++) { + blueProducerNs1_1.send("test2".getBytes()); + blueProducerNs1_2.send("test2".getBytes()); + greenProducerNs1_1.send("test2".getBytes()); + } + log.info("Successfully published messages by migrated producers"); + for (int i = 0; i < n * 3; i++) { + assertEquals(blueConsumerNs1_1.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes()); + assertEquals(blueConsumerNs1_2.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes()); + + } + + // create non-migrated topic which should connect to blue + String diffTopic = BrokerTestUtil + .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + Consumer consumerDiff = client1.newConsumer().topic(diffTopic).subscriptionType(subType) + .subscriptionName("s1-d").subscribe(); + Producer producerDiff = client1.newProducer().topic(diffTopic).enableBatching(false) + .producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + AbstractTopic topicDiff = (AbstractTopic) pulsar1.getBrokerService().getTopic(diffTopic, false).getNow(null).get(); + assertNotNull(topicDiff); + for (int i = 0; i < n; i++) { + producerDiff.send("diff".getBytes()); + assertEquals(consumerDiff.receive(2, TimeUnit.SECONDS).getData(), "diff".getBytes()); + } + + // restart broker-1 + broker1.restart(); + Producer producer4 = client1.newProducer().topic(topicName).enableBatching(false) + .producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + Consumer consumer3 = client1.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("s3").subscribe(); + retryStrategically((test) -> greenTopicNs1_1.getProducers().size() == 4, 10, 500); + assertTrue(greenTopicNs1_1.getProducers().size() == 4); + retryStrategically((test) -> greenTopicNs1_1.getSubscription("s3") != null, 10, 500); + assertFalse(greenTopicNs1_1.getSubscription("s3").getConsumers().isEmpty()); + for (int i = 0; i < n; i++) { + producer4.send("test3".getBytes()); + assertEquals(blueConsumerNs1_1.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes()); + assertEquals(blueConsumerNs1_2.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes()); + assertEquals(consumer3.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes()); + } + + log.info("Successfully consumed messages by migrated consumers"); + + // clean up + blueConsumerNs1_1.close(); + blueConsumerNs1_2.close(); + blueConsumerNs2_1.close(); + blueProducerNs1_1.close(); + blueProducerNs1_2.close(); + blueProducerNs2_1.close(); + greenProducerNs1_1.close(); + greenProducerNs2_1.close(); + } + + @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes") + public void testNamespaceMigrationWithReplicationBacklog(boolean persistent, SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception { + log.info("--- Starting ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---"); + persistent = true; + // topic for namespace1 (to be migrated) + final String topicName = BrokerTestUtil + .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + // topic for namespace2 (not to be migrated) + final String topicName2 = BrokerTestUtil + .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespaceNotToMigrate + "/migrationTopic"); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + @Cleanup + PulsarClient client3 = PulsarClient.builder().serviceUrl(url3.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + // blue cluster - namespace1 - producer/consumer + Producer blueProducerNs1_1 = client1.newProducer().topic(topicName).enableBatching(false) + .producerName("blue-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + Consumer blueConsumerNs1_1 = client1.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("s1").subscribe(); + + // blue cluster - namespace2 - producer/consumer + Producer blueProducerNs2_1 = client1.newProducer().topic(topicName2).enableBatching(false) + .producerName("blue-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + Consumer blueConsumerNs2_1 = client1.newConsumer().topic(topicName2).subscriptionType(subType) + .subscriptionName("s1").subscribe(); + + // blue cluster replication consumer namespace1 + Consumer blueConsumerReplicationNs1 = client3.newConsumer().topic(topicName).subscriptionType(subType) + .subscriptionName("s1").subscribe(); + + // blue cluster replication consumer namespace2 + Consumer blueConsumerReplicationNs2 = client3.newConsumer().topic(topicName2).subscriptionType(subType) + .subscriptionName("s1").subscribe(); + + + AbstractTopic blueTopicNs1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get(); + retryStrategically((test) -> !blueTopicNs1.getProducers().isEmpty(), 5, 500); + retryStrategically((test) -> !blueTopicNs1.getSubscriptions().isEmpty(), 5, 500); + assertFalse(blueTopicNs1.getProducers().isEmpty()); + assertFalse(blueTopicNs1.getSubscriptions().isEmpty()); + + AbstractTopic blueTopicNs2 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName2, false).getNow(null).get(); + retryStrategically((test) -> !blueTopicNs2.getProducers().isEmpty(), 5, 500); + retryStrategically((test) -> !blueTopicNs2.getSubscriptions().isEmpty(), 5, 500); + assertFalse(blueTopicNs2.getProducers().isEmpty()); + assertFalse(blueTopicNs2.getSubscriptions().isEmpty()); + + // build backlog + blueConsumerNs1_1.close(); + blueConsumerNs2_1.close(); + retryStrategically((test) -> blueTopicNs1.getReplicators().size() == 1, 10, 3000); + assertEquals(blueTopicNs1.getReplicators().size(), 1); + retryStrategically((test) -> blueTopicNs2.getReplicators().size() == 1, 10, 3000); + assertEquals(blueTopicNs2.getReplicators().size(), 1); + + // stop service in the replication cluster to build replication backlog + broker3.cleanup(); + retryStrategically((test) -> broker3.getPulsarService() == null, 10, 1000); + assertNull(pulsar3.getBrokerService()); + + //publish messages into topic in blue cluster + int n = 5; + for (int i = 0; i < n; i++) { + blueProducerNs1_1.send("test1".getBytes()); + blueProducerNs2_1.send("test1".getBytes()); + } + retryStrategically((test) -> blueTopicNs1.isReplicationBacklogExist(), 10, 1000); + assertTrue(blueTopicNs1.isReplicationBacklogExist()); + retryStrategically((test) -> blueTopicNs2.isReplicationBacklogExist(), 10, 1000); + assertTrue(blueTopicNs2.isReplicationBacklogExist()); + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + // green cluster - namespace1 - producer/consumer + Producer greenProducerNs1_1 = client2.newProducer().topic(topicName).enableBatching(false) + .producerName("green-producer-ns1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + AbstractTopic greenTopicNs1 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get(); + Producer greenProducerNs2_1 = client2.newProducer().topic(topicName2).enableBatching(false) + .producerName("green-producer-ns2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + AbstractTopic greenTopicNs2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName2, false).getNow(null).get(); + log.info("name of topic 2 - {}", greenTopicNs1.getName()); + assertFalse(greenTopicNs1.getProducers().isEmpty()); + + retryStrategically((test) -> greenTopicNs1.getReplicators().size() == 1, 10, 2000); + log.info("replicators should be ready"); + + ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), + pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + admin1.clusters().updateClusterMigration("r1", isClusterMigrate, migratedUrl); + admin1.namespaces().updateMigrationState(namespace, isNamespaceMigrate); + log.info("update cluster migration called"); + + retryStrategically((test) -> { + try { + blueTopicNs1.checkClusterMigration().get(); + if (isClusterMigrate) { + blueTopicNs2.checkClusterMigration().get(); + } + return true; + } catch (Exception e) { + // ok + } + return false; + }, 10, 500); + + blueTopicNs1.checkClusterMigration().get(); + if (isClusterMigrate) { + blueTopicNs2.checkClusterMigration().get(); + } + + blueProducerNs1_1.sendAsync("test1".getBytes()); + blueProducerNs2_1.sendAsync("test1".getBytes()); + + // producer is disconnected from blue + retryStrategically((test) -> blueTopicNs1.getProducers().isEmpty(), 10, 500); + assertTrue(blueTopicNs1.getProducers().isEmpty()); + if (isClusterMigrate) { + retryStrategically((test) -> blueTopicNs2.getProducers().isEmpty(), 10, 500); + assertTrue(blueTopicNs2.getProducers().isEmpty()); + } else { + retryStrategically((test) -> !blueTopicNs2.getProducers().isEmpty(), 10, 500); + assertFalse(blueTopicNs2.getProducers().isEmpty()); + } + + // verify that the disconnected producer is not redirected + // to replication cluster since there is replication backlog. + assertEquals(greenTopicNs1.getProducers().size(), 1); + + // Restart the service in cluster "r3". + broker3.restart(); + retryStrategically((test) -> broker3.getPulsarService() != null, 10, 1000); + assertNotNull(broker3.getPulsarService()); + pulsar3 = broker3.getPulsarService(); + + // verify that the replication backlog drains once service in cluster "r3" is restarted. + retryStrategically((test) -> !blueTopicNs1.isReplicationBacklogExist(), 10, 1000); + assertFalse(blueTopicNs1.isReplicationBacklogExist()); + retryStrategically((test) -> !blueTopicNs2.isReplicationBacklogExist(), 10, 1000); + assertFalse(blueTopicNs2.isReplicationBacklogExist()); + + // verify that the producer1 is now is now connected to migrated cluster green since backlog is cleared. + retryStrategically((test) -> greenTopicNs1.getProducers().size()==2, 10, 500); + assertEquals(greenTopicNs1.getProducers().size(), 2); + if (isClusterMigrate) { + retryStrategically((test) -> greenTopicNs2.getProducers().size()==2, 10, 500); + assertEquals(greenTopicNs2.getProducers().size(), 2); + } else { + retryStrategically((test) -> greenTopicNs2.getProducers().size()==1, 10, 500); + assertEquals(greenTopicNs2.getProducers().size(), 1); + } + + // clean up + blueProducerNs1_1.close(); + blueProducerNs2_1.close(); + blueConsumerNs1_1.close(); + blueConsumerNs2_1.close(); + greenProducerNs1_1.close(); + greenProducerNs2_1.close(); + } + static class TestBroker extends MockedPulsarServiceBaseTest { private String clusterName; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 2690df658b7be..7f31f3e8d2d57 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -1234,7 +1234,7 @@ CompletableFuture> getAntiAffinityNamespacesAsync( * @param namespace * Namespace name * @param enableDeduplication - * wether to enable or disable deduplication feature + * whether to enable or disable deduplication feature */ CompletableFuture setDeduplicationStatusAsync(String namespace, boolean enableDeduplication); @@ -4623,4 +4623,29 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem * @return */ CompletableFuture removeNamespaceEntryFiltersAsync(String namespace); + + /** + * Enable migration for all topics within a namespace. + *

+ * Migrate all topics of a namespace to new broker. + *

+ * Request example: + * + *

+     * true
+     * 
+ * + * @param namespace + * Namespace name + * @param migrated + * Flag to determine namespace is migrated or not + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void updateMigrationState(String namespace, boolean migrated) throws PulsarAdminException; + } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 066fdf1df4f09..138e8c4793010 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -126,6 +126,8 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") public String resource_group_name = null; + public boolean isMigrated; + public enum BundleType { LARGEST, HOT; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 59f0ef3b34763..05e4352e5fabe 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -1900,6 +1900,17 @@ public CompletableFuture removeNamespaceResourceGroupAsync(String namespac return asyncDeleteRequest(path); } + @Override + public void updateMigrationState(String namespace, boolean migrated) throws PulsarAdminException { + sync(() -> updateMigrationStateAsync(namespace, migrated)); + } + + public CompletableFuture updateMigrationStateAsync(String namespace, boolean migrated) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "migration"); + return asyncPostRequest(path, Entity.entity(migrated, MediaType.APPLICATION_JSON)); + } + private WebTarget namespacePath(NamespaceName namespace, String... parts) { final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces; WebTarget namespacePath = base.path(namespace.toString()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java index c578876b382fb..033146aa607b0 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java @@ -159,21 +159,20 @@ private class UpdateClusterMigration extends CliCommand { @Parameter(description = "cluster-name", required = true) private java.util.List params; - @Parameter(names = "--migrated", description = "Is cluster migrated", required = true) + @Parameter(names = "--migrated", description = "Is cluster migrated") private boolean migrated; - @Parameter(names = "--service-url", description = "New migrated cluster service url", required = false) + @Parameter(names = "--service-url", description = "New migrated cluster service url") private String serviceUrl; @Parameter(names = "--service-url-secure", - description = "New migrated cluster service url secure", required = false) + description = "New migrated cluster service url secure") private String serviceUrlTls; - @Parameter(names = "--broker-url", description = "New migrated cluster broker service url", required = false) + @Parameter(names = "--broker-url", description = "New migrated cluster broker service url") private String brokerServiceUrl; - @Parameter(names = "--broker-url-secure", description = "New migrated cluster broker service url secure", - required = false) + @Parameter(names = "--broker-url-secure", description = "New migrated cluster broker service url secure") private String brokerServiceUrlTls; void run() throws PulsarAdminException { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 33277fdb60839..8162b4b19c21d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -2619,6 +2619,20 @@ void run() throws PulsarAdminException { getAdmin().namespaces().removeNamespaceResourceGroup(namespace); } } + @Parameters(commandDescription = "Update migration state for a namespace") + private class UpdateMigrationState extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Parameter(names = "--migrated", description = "Is namespace migrated") + private boolean migrated; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().updateMigrationState(namespace, migrated); + } + } @Parameters(commandDescription = "Get entry filters for a namespace") private class GetEntryFiltersPerTopic extends CliCommand { @@ -2844,5 +2858,7 @@ public CmdNamespaces(Supplier admin) { jcommander.addCommand("get-entry-filters", new GetEntryFiltersPerTopic()); jcommander.addCommand("set-entry-filters", new SetEntryFiltersPerTopic()); jcommander.addCommand("remove-entry-filters", new RemoveEntryFiltersPerTopic()); + + jcommander.addCommand("update-migration-state", new UpdateMigrationState()); } }