Skip to content

Commit

Permalink
Try to reproduce issue with nondurable consumers impacting topic migr…
Browse files Browse the repository at this point in the history
…ation across clusters.
  • Loading branch information
frankjkelly committed Nov 5, 2024
1 parent ce0c1bb commit 0cb06e2
Showing 1 changed file with 30 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
Expand Down Expand Up @@ -260,8 +261,15 @@ public void beforeMethod(Method m) throws Exception {
* (11) Restart Broker-1 and connect producer/consumer on cluster-1
* @throws Exception
*/
@Test
public void testClusterMigration() throws Exception {
public void testClusterMigrationDurable() throws Exception {
clusterMigrationTest(SubscriptionMode.Durable);
}

public void testClusterMigrationNonDurable() throws Exception {
clusterMigrationTest(SubscriptionMode.NonDurable);
}

private void clusterMigrationTest(SubscriptionMode consumer1subscriptionMode) throws Exception {
log.info("--- Starting ReplicatorTest::testClusterMigration ---");
final String topicName = BrokerTestUtil
.newUniqueName("persistent://" + namespace + "/migrationTopic");
Expand All @@ -272,15 +280,17 @@ public void testClusterMigration() throws Exception {
// cluster-1 producer/consumer
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName)
.subscriptionMode(consumer1subscriptionMode)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("s1").subscribe();
AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500);
assertFalse(topic1.getProducers().isEmpty());
assertFalse(topic1.getSubscriptions().isEmpty());

// build backlog
// build backlog if Durable
consumer1.close();
int n = 5;
for (int i = 0; i < n; i++) {
Expand Down Expand Up @@ -330,24 +340,30 @@ public void testClusterMigration() throws Exception {
retryStrategically((test) -> topic2.getProducers().size() == 3, 10, 500);
assertTrue(topic2.getProducers().size() == 3);

// try to consume backlog messages from cluster-1
consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
for (int i = 0; i < n; i++) {
Message<byte[]> msg = consumer1.receive();
assertEquals(msg.getData(), "test1".getBytes());
consumer1.acknowledge(msg);
if(consumer1subscriptionMode == SubscriptionMode.Durable) {
// try to consume backlog messages from cluster-1
for (int i = 0; i < n; i++) {
Message<byte[]> msg = consumer1.receive();
assertEquals(msg.getData(), "test1".getBytes());
consumer1.acknowledge(msg);
}
// after consuming all messages, consumer should have disconnected
// from cluster-1 and reconnect with cluster-2
retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500);
assertFalse(topic2.getSubscriptions().isEmpty());
} else {
// If we had a non-durable subscription there will be no backlog
// Just confirm the topic exists in the second cluster
Optional<Topic> topic = pulsar2.getBrokerService().getTopic(topicName, false).get();
assertTrue(topic.isPresent());
}
// after consuming all messages, consumer should have disconnected
// from cluster-1 and reconnect with cluster-2
retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500);
assertFalse(topic2.getSubscriptions().isEmpty());

topic1.checkClusterMigration().get();
final var replicators = topic1.getReplicators();
replicators.forEach((r, replicator) -> {
assertFalse(replicator.isConnected());
});

assertTrue(topic1.getSubscriptions().isEmpty());

// not also create a new consumer which should also reconnect to cluster-2
Expand Down Expand Up @@ -387,7 +403,6 @@ public void testClusterMigration() throws Exception {
for (int i = 0; i < n * 3; i++) {
assertEquals(consumer1.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
assertEquals(consumer2.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());

}

// create non-migrated topic which should connect to cluster-1
Expand Down

0 comments on commit 0cb06e2

Please sign in to comment.