diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index e6a7d049366e4..5fe26b3bb45eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; @@ -260,8 +261,15 @@ public void beforeMethod(Method m) throws Exception { * (11) Restart Broker-1 and connect producer/consumer on cluster-1 * @throws Exception */ - @Test - public void testClusterMigration() throws Exception { + public void testClusterMigrationDurable() throws Exception { + clusterMigrationTest(SubscriptionMode.Durable); + } + + public void testClusterMigrationNonDurable() throws Exception { + clusterMigrationTest(SubscriptionMode.NonDurable); + } + + private void clusterMigrationTest(SubscriptionMode consumer1subscriptionMode) throws Exception { log.info("--- Starting ReplicatorTest::testClusterMigration ---"); final String topicName = BrokerTestUtil .newUniqueName("persistent://" + namespace + "/migrationTopic"); @@ -272,7 +280,9 @@ public void testClusterMigration() throws Exception { // cluster-1 producer/consumer Producer producer1 = client1.newProducer().topic(topicName).enableBatching(false) .producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); - Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) + Consumer consumer1 = client1.newConsumer().topic(topicName) + .subscriptionMode(consumer1subscriptionMode) + .subscriptionType(SubscriptionType.Shared) .subscriptionName("s1").subscribe(); AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get(); retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500); @@ -280,7 +290,7 @@ public void testClusterMigration() throws Exception { assertFalse(topic1.getProducers().isEmpty()); assertFalse(topic1.getSubscriptions().isEmpty()); - // build backlog + // build backlog if Durable consumer1.close(); int n = 5; for (int i = 0; i < n; i++) { @@ -330,24 +340,30 @@ public void testClusterMigration() throws Exception { retryStrategically((test) -> topic2.getProducers().size() == 3, 10, 500); assertTrue(topic2.getProducers().size() == 3); - // try to consume backlog messages from cluster-1 consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); - for (int i = 0; i < n; i++) { - Message msg = consumer1.receive(); - assertEquals(msg.getData(), "test1".getBytes()); - consumer1.acknowledge(msg); + if(consumer1subscriptionMode == SubscriptionMode.Durable) { + // try to consume backlog messages from cluster-1 + for (int i = 0; i < n; i++) { + Message msg = consumer1.receive(); + assertEquals(msg.getData(), "test1".getBytes()); + consumer1.acknowledge(msg); + } + // after consuming all messages, consumer should have disconnected + // from cluster-1 and reconnect with cluster-2 + retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500); + assertFalse(topic2.getSubscriptions().isEmpty()); + } else { + // If we had a non-durable subscription there will be no backlog + // Just confirm the topic exists in the second cluster + Optional topic = pulsar2.getBrokerService().getTopic(topicName, false).get(); + assertTrue(topic.isPresent()); } - // after consuming all messages, consumer should have disconnected - // from cluster-1 and reconnect with cluster-2 - retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500); - assertFalse(topic2.getSubscriptions().isEmpty()); topic1.checkClusterMigration().get(); final var replicators = topic1.getReplicators(); replicators.forEach((r, replicator) -> { assertFalse(replicator.isConnected()); }); - assertTrue(topic1.getSubscriptions().isEmpty()); // not also create a new consumer which should also reconnect to cluster-2 @@ -387,7 +403,6 @@ public void testClusterMigration() throws Exception { for (int i = 0; i < n * 3; i++) { assertEquals(consumer1.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes()); assertEquals(consumer2.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes()); - } // create non-migrated topic which should connect to cluster-1