diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java index 213296d22833a..722073c38d7dd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java @@ -20,30 +20,34 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import lombok.Cleanup; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.Murmur3_32Hash; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; @Test(groups = "broker-impl") public class KeySharedSubscriptionTest extends ProducerConsumerBase { @@ -81,80 +85,50 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc AtomicLong lastActiveTime = new AtomicLong(); AtomicBoolean canAcknowledgement = new AtomicBoolean(false); - @Cleanup - Consumer consumer1 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .consumerName("con-1") - .messageListener((cons1, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons1.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - }) - .subscribe(); - @Cleanup - Consumer consumer2 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .messageListener((cons2, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons2.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); + List> consumerList = new ArrayList<>(); + // create 3 consumers + for (int i = 0; i < 3; i++) { + ConsumerBuilder builder = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub-1") + .subscriptionType(subscriptionType) + .messageListener((consumer, msg) -> { + lastActiveTime.set(System.currentTimeMillis()); + nameToId.computeIfAbsent(consumer, (k) -> new ArrayList<>()) + .add(msg.getMessageId()); + recMessages.add(msg.getMessageId()); + if (canAcknowledgement.get()) { + try { + consumer.acknowledge(msg); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } } - } - }) - .consumerName("con-2") - .subscribe(); - @Cleanup - Consumer consumer3 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .messageListener((cons3, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons3.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - }) - .consumerName("con-3") - .subscribe(); + }); + + if (subscriptionType == SubscriptionType.Key_Shared) { + // ensure every consumer can be distributed messages + int hash = Murmur3_32Hash.getInstance().makeHash(("key-" + i).getBytes()) + % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE; + builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, hash))); + } + + consumerList.add(builder.subscribe()); + } - @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) .enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) // We chose 9 because the maximum unacked message is 10 .batchingMaxMessages(9) + .batcherBuilder(BatcherBuilder.KEY_BASED) .create(); for (int i = 0; i < totalMsg; i++) { - producer.sendAsync(UUID.randomUUID().toString() - .getBytes(StandardCharsets.UTF_8)) - .thenAccept(pubMessages::add); + byte[] msg = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + producer.newMessage().key("key-" + (i % 3)).value(msg) + .sendAsync().thenAccept(pubMessages::add); } // Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages. @@ -176,7 +150,7 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc // Wait for all consumers to continue receiving messages. Awaitility.await() - .atMost(30, TimeUnit.SECONDS) + .atMost(15, TimeUnit.SECONDS) .pollDelay(5, TimeUnit.SECONDS) .until(() -> (System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5)); @@ -186,5 +160,11 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc Assert.assertEquals(pubMessages.size(), totalMsg); Assert.assertEquals(pubMessages.size(), recMessages.size()); Assert.assertTrue(recMessages.containsAll(pubMessages)); + + // cleanup + producer.close(); + for (Consumer consumer : consumerList) { + consumer.close(); + } } }