Skip to content

Commit

Permalink
[fix] [client] fix huge permits if acked a half batched message (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored Feb 26, 2024
1 parent bbc6224 commit 0c49cac
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -583,4 +583,89 @@ private org.apache.pulsar.broker.service.Consumer makeConsumerReceiveMessagesDel
consumerSet.add(spyServiceConsumer);
return originalConsumer;
}

/***
* 1. Send a batch message contains 100 single messages.
* 2. Ack 2 messages.
* 3. Redeliver the batch message and ack them.
* 4. Verify: the permits is correct.
*/
@Test
public void testPermitsIfHalfAckBatchMessage() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
final String subName = "s1";
final int receiverQueueSize = 1000;
final int ackedMessagesCountInTheFistStep = 2;
admin.topics().createNonPartitionedTopic(topicName);
admin.topics(). createSubscription(topicName, subName, MessageId.earliest);
ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.receiverQueueSize(receiverQueueSize)
.subscriptionName(subName)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true);

// Send 100 messages.
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.create();
CompletableFuture<MessageId> lastSent = null;
for (int i = 1; i <= 100; i++) {
lastSent = producer. sendAsync(i + "");
}
producer.flush();
lastSent.join();

// Ack 2 messages, and trigger a redelivery.
Consumer<String> consumer1 = consumerBuilder.subscribe();
for (int i = 0; i < ackedMessagesCountInTheFistStep; i++) {
Message msg = consumer1. receive(2, TimeUnit.SECONDS);
assertNotNull(msg);
consumer1.acknowledge(msg);
}
consumer1.close();

// Receive the left 98 messages, and ack them.
// Verify the permits is correct.
ConsumerImpl<String> consumer2 = (ConsumerImpl<String>) consumerBuilder.subscribe();
Awaitility.await().until(() -> consumer2.isConnected());
List<MessageId> messages = new ArrayList<>();
int nextMessageValue = ackedMessagesCountInTheFistStep + 1;
while (true) {
Message<String> msg = consumer2.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
assertEquals(msg.getValue(), nextMessageValue + "");
messages.add(msg.getMessageId());
nextMessageValue++;
}
assertEquals(messages.size(), 98);
consumer2.acknowledge(messages);

org.apache.pulsar.broker.service.Consumer serviceConsumer2 =
getTheUniqueServiceConsumer(topicName, subName);
Awaitility.await().untilAsserted(() -> {
// After the messages were pop out, the permits in the client memory went to 98.
int permitsInClientMemory = consumer2.getAvailablePermits();
int permitsInBroker = serviceConsumer2.getAvailablePermits();
assertEquals(permitsInClientMemory + permitsInBroker, receiverQueueSize);
});

// cleanup.
producer.close();
consumer2.close();
admin.topics().delete(topicName, false);
}

private org.apache.pulsar.broker.service.Consumer getTheUniqueServiceConsumer(String topic, String sub) {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService(). getTopic(topic, false).join().get();
PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) persistentTopic.getSubscription(sub).getDispatcher();
return dispatcher.getConsumers().iterator().next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1276,6 +1277,10 @@ protected boolean isValidConsumerEpoch(MessageImpl<T> message) {
return true;
}

protected boolean isSingleMessageAcked(BitSetRecyclable ackBitSet, int batchIndex) {
return ackBitSet != null && !ackBitSet.get(batchIndex);
}

public boolean hasBatchReceiveTimeout() {
return batchReceiveTimeout != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ protected <V> MessageImpl<V> newSingleMessage(final int index,
return null;
}

if (ackBitSet != null && !ackBitSet.get(index)) {
if (isSingleMessageAcked(ackBitSet, index)) {
return null;
}

Expand Down Expand Up @@ -1643,7 +1643,14 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
singleMessageMetadata, uncompressedPayload, batchMessage, schema, true,
ackBitSet, ackSetInMessageId, redeliveryCount, consumerEpoch);
if (message == null) {
skippedMessages++;
// If it is not in ackBitSet, it means Broker does not want to deliver it to the client, and
// did not decrease the permits in the broker-side.
// So do not acquire more permits for this message.
// Why not skip this single message in the first line of for-loop block? We need call
// "newSingleMessage" to move "payload.readerIndex" to a correct value to get the correct data.
if (!isSingleMessageAcked(ackBitSet, i)) {
skippedMessages++;
}
continue;
}
if (possibleToDeadLetter != null) {
Expand Down

0 comments on commit 0c49cac

Please sign in to comment.