Skip to content

Commit

Permalink
[fix][test] flaky test `testCanRecoverConsumptionWhenLiftMaxUnAckedMe…
Browse files Browse the repository at this point in the history
…ssagesRestriction` (apache#18726)
  • Loading branch information
labuladong authored and Demogorgon314 committed Dec 29, 2022
1 parent 2f11169 commit 6592c7b
Showing 1 changed file with 45 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -77,80 +81,50 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc
AtomicLong lastActiveTime = new AtomicLong();
AtomicBoolean canAcknowledgement = new AtomicBoolean(false);

@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.consumerName("con-1")
.messageListener((cons1, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons1.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
})
.subscribe();
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((cons2, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons2.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
})
.consumerName("con-2")
.subscribe();
@Cleanup
Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((cons3, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons3.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
List<Consumer<?>> consumerList = new ArrayList<>();
// create 3 consumers
for (int i = 0; i < 3; i++) {
ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((consumer, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(consumer, (k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}
})
.consumerName("con-3")
.subscribe();
});

if (subscriptionType == SubscriptionType.Key_Shared) {
// ensure every consumer can be distributed messages
int hash = Murmur3_32Hash.getInstance().makeHash(("key-" + i).getBytes())
% KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, hash)));
}

consumerList.add(builder.subscribe());
}

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
// We chose 9 because the maximum unacked message is 10
.batchingMaxMessages(9)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();

for (int i = 0; i < totalMsg; i++) {
producer.sendAsync(UUID.randomUUID().toString()
.getBytes(StandardCharsets.UTF_8))
.thenAccept(pubMessages::add);
byte[] msg = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
producer.newMessage().key("key-" + (i % 3)).value(msg)
.sendAsync().thenAccept(pubMessages::add);
}

// Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages.
Expand All @@ -172,6 +146,7 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc

// Wait for all consumers to continue receiving messages.
Awaitility.await()
.atMost(15, TimeUnit.SECONDS)
.pollDelay(5, TimeUnit.SECONDS)
.until(() ->
(System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5));
Expand All @@ -181,5 +156,11 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc
Assert.assertEquals(pubMessages.size(), totalMsg);
Assert.assertEquals(pubMessages.size(), recMessages.size());
Assert.assertTrue(recMessages.containsAll(pubMessages));

// cleanup
producer.close();
for (Consumer<?> consumer : consumerList) {
consumer.close();
}
}
}

0 comments on commit 6592c7b

Please sign in to comment.