diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index f5f088926a091..732a6af325b2f 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -44,6 +44,7 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData; +import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.policies.data.ConsumerStats; @@ -185,9 +186,6 @@ public Pair sendMessages(final List entries) { readChecksum(metadataAndPayload); } - // stats - msgOut.recordEvent(metadataAndPayload.readableBytes()); - if (log.isDebugEnabled()) { log.debug("[{}] Sending message to consumerId {}, entry id {}", subscription, consumerId, pos.getEntryId()); @@ -238,6 +236,7 @@ int updatePermitsAndPendingAcks(final List entries) throws PulsarServerEx int permitsToReduce = 0; Iterator iter = entries.iterator(); boolean unsupportedVersion = false; + long totalReadableBytes = 0; boolean clientSupportBatchMessages = cnx.isBatchMessageCompatibleVersion(); while (iter.hasNext()) { Entry entry = iter.next(); @@ -258,6 +257,7 @@ int updatePermitsAndPendingAcks(final List entries) throws PulsarServerEx if (batchSize > 1 && !clientSupportBatchMessages) { unsupportedVersion = true; } + totalReadableBytes += metadataAndPayload.readableBytes(); permitsToReduce += batchSize; } // reduce permit and increment unackedMsg count with total number of messages in batch-msgs @@ -271,6 +271,8 @@ int updatePermitsAndPendingAcks(final List entries) throws PulsarServerEx log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits); } } + + msgOut.recordMultipleEvents(permitsToReduce, totalReadableBytes); return permitsToReduce; } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerStatTest.java index 1470b097ec271..5982483008eba 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -18,6 +18,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.util.List; @@ -27,6 +28,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +39,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.RateLimiter; +import com.yahoo.pulsar.client.admin.PulsarAdminException; import com.yahoo.pulsar.client.impl.ConsumerStats; import com.yahoo.pulsar.client.impl.ProducerStats; @@ -323,6 +327,39 @@ public void testSendTimeout(int batchMessageDelayMs) throws Exception { assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent()); log.info("-- Exiting {} test --", methodName); } + + public void testBatchMessagesRateOut() throws PulsarClientException, InterruptedException, PulsarAdminException { + log.info("-- Starting {} test --", methodName); + String topicName = "persistent://my-property/cluster/my-ns/testBatchMessagesRateOut"; + double produceRate = 17; + int batchSize = 5; + ConsumerConfiguration consumerConf = new ConsumerConfiguration(); + consumerConf.setSubscriptionType(SubscriptionType.Exclusive); + Consumer consumer = pulsarClient.subscribe(topicName, "my-subscriber-name", consumerConf); + ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingMaxMessages(batchSize); + producerConf.setBatchingEnabled(true); + producerConf.setBatchingMaxPublishDelay(2, TimeUnit.SECONDS); + + Producer producer = pulsarClient.createProducer(topicName, producerConf); + AtomicBoolean runTest = new AtomicBoolean(true); + Thread t1 = new Thread(() -> { + RateLimiter r = RateLimiter.create(produceRate); + while (runTest.get()) { + r.acquire(); + producer.sendAsync("Hello World".getBytes()); + consumer.receiveAsync().thenAccept(message -> consumer.acknowledgeAsync(message)); + } + }); + t1.start(); + Thread.sleep(2000); // Two seconds sleep + runTest.set(false); + pulsar.getBrokerService().updateRates(); + double actualRate = admin.persistentTopics().getStats(topicName).msgRateOut; + assertTrue(actualRate > (produceRate / batchSize)); + consumer.unsubscribe(); + log.info("-- Exiting {} test --", methodName); + } public void validatingLogInfo(Consumer consumer, Producer producer, boolean verifyAckCount) throws InterruptedException {