Skip to content

Commit

Permalink
Fix incoming message size issue that introduced in apache#9113 (apach…
Browse files Browse the repository at this point in the history
…e#9182)

### Motivation

Fix incoming message size issue that introduced in apache#9113. We should decrease the incoming message size when taking messages from the queue and increase the incoming message size while adding messages to the queue. With apache#9113, will always increase the incoming queue size.

### Modifications

Add method `increaseIncomingSize` and `decreaseIncomingSize`

### Verifying this change

Add a new test for verifying the incoming message size should be zero while the incoming queue size is zero.
  • Loading branch information
codelipenghui authored and merlimat committed Feb 10, 2021
1 parent d44dcbd commit a950cec
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
Expand Down Expand Up @@ -3693,4 +3694,54 @@ public void testGetStatsForPartitionedTopic() throws Exception {
consumer.close();
producer.close();
}

@DataProvider(name = "partitioned")
public static Object[] isPartitioned() {
return new Object[] {false, true};
}

@Test(dataProvider = "partitioned")
public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
final String topicName = "persistent://my-property/my-ns/testIncomingMessageSize-" +
UUID.randomUUID().toString();
final String subName = "my-sub";

if (isPartitioned) {
admin.topics().createPartitionedTopic(topicName, 3);
}

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

final int messages = 100;
List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync());
}
FutureUtil.waitForAll(messageIds).get();

Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should greater that 0, current size is {}", size);
Assert.assertTrue(size > 0);
});

for (int i = 0; i < messages; i++) {
consumer.acknowledge(consumer.receive());
}

Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should be 0, current size is {}", size);
Assert.assertEquals(size, 0);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -664,8 +665,7 @@ protected boolean canEnqueueMessage(Message<T> message) {

protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
this, message.getData() == null ? 0 : message.getData().length);
increaseIncomingMessageSize(message);
}
return hasEnoughMessagesForBatchReceive();
}
Expand All @@ -675,7 +675,7 @@ protected boolean hasEnoughMessagesForBatchReceive() {
return false;
}
return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
|| (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes());
|| (batchReceivePolicy.getMaxNumBytes() > 0 && getIncomingMessageSize() >= batchReceivePolicy.getMaxNumBytes());
}

private void verifyConsumerState() throws PulsarClientException {
Expand Down Expand Up @@ -847,13 +847,22 @@ protected boolean hasPendingBatchReceive() {
return pendingBatchReceives != null && peekNextBatchReceive() != null;
}

protected void increaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
this, message.getData() == null ? 0 : message.getData().length);
}

protected void resetIncomingMessageSize() {
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
}

protected void updateIncomingMessageSize(final Message<?> message) {
protected void decreaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
(message.getData() != null) ? message.getData().length : 0);
(message.getData() != null) ? -message.getData().length : 0);
}

public long getIncomingMessageSize() {
return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
}

protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,7 @@ protected synchronized void messageProcessed(Message<?> msg) {
stats.updateNumMsgsReceived(msg);

trackMessage(msg);
updateIncomingMessageSize(msg);
decreaseIncomingMessageSize(msg);
}

protected void trackMessage(Message<?> msg) {
Expand Down Expand Up @@ -2197,7 +2197,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
// try not to remove elements that are added while we remove
Message<T> message = incomingMessages.poll();
while (message != null) {
updateIncomingMessageSize(message);
decreaseIncomingMessageSize(message);
messagesFromQueue++;
MessageIdImpl id = getMessageIdImpl(message);
if (!messageIds.contains(id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
@Override
protected synchronized void messageProcessed(Message<?> msg) {
unAckedMessageTracker.add(msg.getMessageId());
updateIncomingMessageSize(msg);
decreaseIncomingMessageSize(msg);
}

private void resumeReceivingFromPausedConsumersIfNeeded() {
Expand All @@ -335,7 +335,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
message = incomingMessages.take();
updateIncomingMessageSize(message);
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
Expand All @@ -351,7 +351,7 @@ protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarCl
try {
message = incomingMessages.poll(timeout, unit);
if (message != null) {
updateIncomingMessageSize(message);
decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
}
Expand Down Expand Up @@ -392,7 +392,7 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Message<T> msg = incomingMessages.poll();
if (msg != null) {
updateIncomingMessageSize(msg);
decreaseIncomingMessageSize(msg);
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
Expand Down Expand Up @@ -420,7 +420,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
updateIncomingMessageSize(message);
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
Expand Down Expand Up @@ -785,7 +785,7 @@ private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
Message<T> message = incomingMessages.poll();
checkState(message instanceof TopicMessageImpl);
while (message != null) {
updateIncomingMessageSize(message);
decreaseIncomingMessageSize(message);
MessageId messageId = message.getMessageId();
if (!messageIds.contains(messageId)) {
messageIds.add(messageId);
Expand Down

0 comments on commit a950cec

Please sign in to comment.