diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java index a06085d3d4626..9f881f720a050 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java @@ -18,24 +18,21 @@ */ package org.apache.pulsar.client.impl; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.ServerCnx; -import org.apache.pulsar.client.api.BatcherBuilder; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.awaitility.Awaitility; @@ -230,4 +227,44 @@ public void testRetentionPolicyByProducingMessages() throws Exception { assertEquals(internalStats.ledgers.size(), 1); }); } + + + @Test + public void testProducerCompressionMinMsgBodySize() throws PulsarClientException { + byte[] msg1022 = new byte[1022]; + byte[] msg1025 = new byte[1025]; + AtomicBoolean isCompressed = new AtomicBoolean(false); + final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic(topicName) + .create(); + producer = spy(producer); + doAnswer(invocation -> { + isCompressed.set(false); + return null; + }).when(producer).applyCompression(any()); + + producer.conf.setCompressMinMsgBodySize(1024); + producer.conf.setCompressionType(CompressionType.LZ4); + // disable batch + producer.conf.setBatchingEnabled(false); + isCompressed.set(false); + producer.newMessage().value(msg1022).send(); + assertFalse(isCompressed.get()); + + isCompressed.set(false); + producer.newMessage().value(msg1025).send(); + assertTrue(isCompressed.get()); + + // enable batch + producer.conf.setBatchingEnabled(true); + isCompressed.set(false); + producer.newMessage().value(msg1022).send(); + assertFalse(isCompressed.get()); + + isCompressed.set(false); + producer.newMessage().value(msg1025).send(); + assertTrue(isCompressed.get()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b7a27b034b271..4197d24eb3083 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -477,7 +477,8 @@ CompletableFuture internalSendWithTxnAsync(Message message, Transa * @param payload * @return a new payload */ - protected ByteBuf applyCompression(ByteBuf payload) { + @VisibleForTesting + public ByteBuf applyCompression(ByteBuf payload) { ByteBuf compressedPayload = compressor.encode(payload); payload.release(); return compressedPayload; @@ -504,23 +505,26 @@ public void sendAsync(Message message, SendCallback callback) { boolean compressed = false; // Batch will be compressed when closed // If a message has a delayed delivery time, we'll always send it individually - if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) - && payload.readableBytes() > conf.getCompressMinMsgBodySize())) { - compressedPayload = applyCompression(payload); - compressed = true; - - // validate msg-size (For batching this will be check at the batch completion size) - int compressedSize = compressedPayload.readableBytes(); - if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { - compressedPayload.release(); - String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; - PulsarClientException.InvalidMessageException invalidMessageException = - new PulsarClientException.InvalidMessageException( - format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds" - + " %d bytes", - producerName, topic, compressedStr, compressedSize, getMaxMessageSize())); - completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); - return; + if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()))) { + if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) { + + } else { + compressedPayload = applyCompression(payload); + compressed = true; + + // validate msg-size (For batching this will be check at the batch completion size) + int compressedSize = compressedPayload.readableBytes(); + if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { + compressedPayload.release(); + String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; + PulsarClientException.InvalidMessageException invalidMessageException = + new PulsarClientException.InvalidMessageException( + format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds" + + " %d bytes", + producerName, topic, compressedStr, compressedSize, getMaxMessageSize())); + completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); + return; + } } }