diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 55dab8ba47cb2..8eda0d0814bea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -77,6 +77,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; @@ -3695,4 +3696,54 @@ public void testGetStatsForPartitionedTopic() throws Exception { consumer.close(); producer.close(); } + + @DataProvider(name = "partitioned") + public static Object[] isPartitioned() { + return new Object[] {false, true}; + } + + @Test(dataProvider = "partitioned") + public void testIncomingMessageSize(boolean isPartitioned) throws Exception { + final String topicName = "persistent://my-property/my-ns/testIncomingMessageSize-" + + UUID.randomUUID().toString(); + final String subName = "my-sub"; + + if (isPartitioned) { + admin.topics().createPartitionedTopic(topicName, 3); + } + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + + final int messages = 100; + List> messageIds = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync()); + } + FutureUtil.waitForAll(messageIds).get(); + + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + long size = ((ConsumerBase) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should greater that 0, current size is {}", size); + Assert.assertTrue(size > 0); + }); + + for (int i = 0; i < messages; i++) { + consumer.acknowledge(consumer.receive()); + } + + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + long size = ((ConsumerBase) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should be 0, current size is {}", size); + Assert.assertEquals(size, 0); + }); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index c184f9ad9a2d8..dc4d2c106797b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import java.util.Collections; import java.util.List; @@ -668,8 +669,7 @@ protected boolean canEnqueueMessage(Message message) { protected boolean enqueueMessageAndCheckBatchReceive(Message message) { if (canEnqueueMessage(message) && incomingMessages.offer(message)) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet( - this, message.getData() == null ? 0 : message.getData().length); + increaseIncomingMessageSize(message); } return hasEnoughMessagesForBatchReceive(); } @@ -679,7 +679,7 @@ protected boolean hasEnoughMessagesForBatchReceive() { return false; } return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages()) - || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()); + || (batchReceivePolicy.getMaxNumBytes() > 0 && getIncomingMessageSize() >= batchReceivePolicy.getMaxNumBytes()); } private void verifyConsumerState() throws PulsarClientException { @@ -851,13 +851,22 @@ protected boolean hasPendingBatchReceive() { return pendingBatchReceives != null && peekNextBatchReceive() != null; } + protected void increaseIncomingMessageSize(final Message message) { + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet( + this, message.getData() == null ? 0 : message.getData().length); + } + protected void resetIncomingMessageSize() { INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); } - protected void updateIncomingMessageSize(final Message message) { + protected void decreaseIncomingMessageSize(final Message message) { INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, - (message.getData() != null) ? message.getData().length : 0); + (message.getData() != null) ? -message.getData().length : 0); + } + + public long getIncomingMessageSize() { + return INCOMING_MESSAGES_SIZE_UPDATER.get(this); } protected abstract void completeOpBatchReceive(OpBatchReceive op); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b5c3c0136f580..0473e23e8f466 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1528,7 +1528,7 @@ protected synchronized void messageProcessed(Message msg) { stats.updateNumMsgsReceived(msg); trackMessage(msg); - updateIncomingMessageSize(msg); + decreaseIncomingMessageSize(msg); } protected void trackMessage(Message msg) { @@ -2222,7 +2222,7 @@ private int removeExpiredMessagesFromQueue(Set messageIds) { // try not to remove elements that are added while we remove Message message = incomingMessages.poll(); while (message != null) { - updateIncomingMessageSize(message); + decreaseIncomingMessageSize(message); messagesFromQueue++; MessageIdImpl id = getMessageIdImpl(message); if (!messageIds.contains(id)) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index f59c2c698870e..4149b39ca6a46 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -311,7 +311,7 @@ private void messageReceived(ConsumerImpl consumer, Message message) { @Override protected synchronized void messageProcessed(Message msg) { unAckedMessageTracker.add(msg.getMessageId()); - updateIncomingMessageSize(msg); + decreaseIncomingMessageSize(msg); } private void resumeReceivingFromPausedConsumersIfNeeded() { @@ -334,7 +334,7 @@ protected Message internalReceive() throws PulsarClientException { Message message; try { message = incomingMessages.take(); - updateIncomingMessageSize(message); + decreaseIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); @@ -350,7 +350,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarCl try { message = incomingMessages.poll(timeout, unit); if (message != null) { - updateIncomingMessageSize(message); + decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); } @@ -391,7 +391,7 @@ protected CompletableFuture> internalBatchReceiveAsync() { while (msgPeeked != null && messages.canAdd(msgPeeked)) { Message msg = incomingMessages.poll(); if (msg != null) { - updateIncomingMessageSize(msg); + decreaseIncomingMessageSize(msg); Message interceptMsg = beforeConsume(msg); messages.add(interceptMsg); } @@ -419,7 +419,7 @@ protected CompletableFuture> internalReceiveAsync() { pendingReceives.add(result); cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { - updateIncomingMessageSize(message); + decreaseIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); @@ -784,7 +784,7 @@ private void removeExpiredMessagesFromQueue(Set messageIds) { Message message = incomingMessages.poll(); checkState(message instanceof TopicMessageImpl); while (message != null) { - updateIncomingMessageSize(message); + decreaseIncomingMessageSize(message); MessageId messageId = message.getMessageId(); if (!messageIds.contains(messageId)) { messageIds.add(messageId);