Skip to content

Commit

Permalink
[fix][broker] Fix incorrect unack msk count when dup ack a message (#8)
Browse files Browse the repository at this point in the history
* [fix][broker] Fix incorrect unack msk count when dup ack a message (apache#20990)

(cherry picked from commit 4facdad)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>

* [fix][test] flaky test `testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction` (apache#18726)

(cherry picked from commit 2d205c9)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>

---------

Co-authored-by: Jiwei Guo <technoboy@apache.org>
Co-authored-by: labuladong <labuladong@foxmail.com>
  • Loading branch information
3 people committed Mar 15, 2024
1 parent c46cd4f commit f7f7c96
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
long totalAckCount = 0;
boolean individualAck = false;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
Expand All @@ -467,14 +468,18 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
individualAck = true;
}

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);

if (individualAck) {
if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
} else {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
positionsAcked.add(position);

checkCanRemovePendingAcksAndHandle(position, msgId);

checkAckValidationError(ack, position);

totalAckCount += ackedCount;
Expand Down Expand Up @@ -636,10 +641,11 @@ private void checkAckValidationError(CommandAck ack, PositionImpl position) {
}
}

private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) {
private boolean checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) {
if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) {
removePendingAcks(position);
return removePendingAcks(position);
}
return false;
}

private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
Expand Down Expand Up @@ -886,7 +892,7 @@ public int hashCode() {
*
* @param position
*/
private void removePendingAcks(PositionImpl position) {
private boolean removePendingAcks(PositionImpl position) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
for (Consumer consumer : subscription.getConsumers()) {
Expand All @@ -907,7 +913,7 @@ private void removePendingAcks(PositionImpl position) {
if (ackedPosition != null) {
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return;
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
Expand All @@ -921,7 +927,9 @@ private void removePendingAcks(PositionImpl position) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
}
return false;
}

public ConcurrentLongLongPairHashMap getPendingAcks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1435,4 +1435,30 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception
assertTrue(conf.isForceDeleteTenantAllowed());
});
}

@Test
public void testDuplicateAcknowledgement() throws Exception {
final String ns = "prop/ns-test";

admin.namespaces().createNamespace(ns, 2);
final String topicName = "persistent://prop/ns-test/duplicated-acknowledgement-test";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("sub-1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true)
.subscribe();
producer.send("1".getBytes(StandardCharsets.UTF_8));
Message<byte[]> message = consumer1.receive();
consumer1.acknowledge(message);
consumer1.acknowledge(message);
assertEquals(admin.topics().getStats(topicName).getSubscriptions()
.get("sub-1").getUnackedMessages(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,34 @@

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Cleanup;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

@Test(groups = "broker-impl")
public class KeySharedSubscriptionTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -81,80 +85,50 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc
AtomicLong lastActiveTime = new AtomicLong();
AtomicBoolean canAcknowledgement = new AtomicBoolean(false);

@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.consumerName("con-1")
.messageListener((cons1, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons1.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
})
.subscribe();
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((cons2, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons2.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
List<Consumer<?>> consumerList = new ArrayList<>();
// create 3 consumers
for (int i = 0; i < 3; i++) {
ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((consumer, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(consumer, (k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}
})
.consumerName("con-2")
.subscribe();
@Cleanup
Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((cons3, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons3.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
})
.consumerName("con-3")
.subscribe();
});

if (subscriptionType == SubscriptionType.Key_Shared) {
// ensure every consumer can be distributed messages
int hash = Murmur3_32Hash.getInstance().makeHash(("key-" + i).getBytes())
% KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, hash)));
}

consumerList.add(builder.subscribe());
}

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
// We chose 9 because the maximum unacked message is 10
.batchingMaxMessages(9)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();

for (int i = 0; i < totalMsg; i++) {
producer.sendAsync(UUID.randomUUID().toString()
.getBytes(StandardCharsets.UTF_8))
.thenAccept(pubMessages::add);
byte[] msg = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
producer.newMessage().key("key-" + (i % 3)).value(msg)
.sendAsync().thenAccept(pubMessages::add);
}

// Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages.
Expand All @@ -176,7 +150,7 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc

// Wait for all consumers to continue receiving messages.
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.atMost(15, TimeUnit.SECONDS)
.pollDelay(5, TimeUnit.SECONDS)
.until(() ->
(System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5));
Expand All @@ -186,5 +160,11 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc
Assert.assertEquals(pubMessages.size(), totalMsg);
Assert.assertEquals(pubMessages.size(), recMessages.size());
Assert.assertTrue(recMessages.containsAll(pubMessages));

// cleanup
producer.close();
for (Consumer<?> consumer : consumerList) {
consumer.close();
}
}
}

0 comments on commit f7f7c96

Please sign in to comment.