diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index eb4a322f3c3a1..e76d6d6962c8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -51,6 +51,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -69,6 +70,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -92,6 +94,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -99,11 +102,13 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; +import org.apache.pulsar.client.impl.ProducerBase; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.client.impl.schema.writer.AvroWriter; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -4692,4 +4697,143 @@ public void flush(ChannelHandlerContext ctx) throws Exception { consumer.close(); admin.topics().delete(topic, false); } + + @DataProvider(name = "enableBatchSend") + public Object[][] enableBatchSend() { + return new Object[][]{ + {true}, + {false} + }; + } + + @Test(dataProvider = "enableBatchSend") + public void testPublishWithCreateMessageManually(boolean enableBatchSend) throws Exception { + final int messageCount = 10; + final List messageArrayBeforeSend = Collections.synchronizedList(new ArrayList<>()); + final List messageArrayOnSendAcknowledgement = Collections.synchronizedList(new ArrayList<>()); + // Create an interceptor to verify the ref count of Message.payload is as expected. + AtomicBoolean payloadWasReleasedWhenIntercept = new AtomicBoolean(false); + ProducerInterceptor interceptor = new ProducerInterceptor(){ + + @Override + public void close() { + + } + @Override + public Message beforeSend(Producer producer, Message message) { + MessageImpl msgImpl = (MessageImpl) message; + log.info("payload.refCnf before send: {}", msgImpl.getDataBuffer().refCnt()); + if (msgImpl.getDataBuffer().refCnt() < 1) { + payloadWasReleasedWhenIntercept.set(true); + } + messageArrayBeforeSend.add(msgImpl); + return message; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, + Throwable exception) { + MessageImpl msgImpl = (MessageImpl) message; + log.info("payload.refCnf on send acknowledgement: {}", msgImpl.getDataBuffer().refCnt()); + if (msgImpl.getDataBuffer().refCnt() < 1) { + payloadWasReleasedWhenIntercept.set(true); + } + messageArrayOnSendAcknowledgement.add(msgImpl); + } + }; + + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp"); + admin.topics().createNonPartitionedTopic(topic); + ProducerBase producerBase = (ProducerBase) pulsarClient.newProducer().topic(topic).intercept(interceptor) + .enableBatching(enableBatchSend).create(); + + // Publish message. + // Note: "ProducerBase.sendAsync" is not equals to "Producer.sendAsync". + final MessageImpl[] messageArraySent = new MessageImpl[messageCount]; + final ByteBuf[] payloads = new ByteBuf[messageCount]; + List> sendFutureList = new ArrayList<>(); + List releaseFutureList = new ArrayList<>(); + for (int i = 0; i < messageCount; i++) { + // Create message payload, refCnf = 1 now. + ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1); + payloads[i] = payload; + log.info("payload_{}.refCnf 1st: {}", i, payload.refCnt()); + payload.writeByte(i); + // refCnf = 2 now. + payload.retain(); + log.info("payload_{}.refCnf 2nd: {}", i, payload.refCnt()); + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.setUncompressedSize(1); + MessageImpl message1 = MessageImpl.create(topic, null, messageMetadata, payload, Optional.empty(), + null, Schema.BYTES, 0, true, 0); + messageArraySent[i] = message1; + // Release ByteBuf the first time, refCnf = 1 now. + CompletableFuture future = producerBase.sendAsync(message1); + sendFutureList.add(future); + final int indexForLog = i; + future.whenComplete((v, ex) -> { + message1.release(); + log.info("payload_{}.refCnf 3rd after_complete_refCnf: {}, ex: {}", indexForLog, payload.refCnt(), + ex == null ? "null" : ex.getMessage()); + }); + } + sendFutureList.get(messageCount - 1).join(); + + // Left 2 seconds to wait the code in the finally-block, which is using to avoid this test to be flaky. + Thread.sleep(1000 * 2); + + // Verify: payload's refCnf. + for (int i = 0; i < messageCount; i++) { + log.info("payload_{}.refCnf 4th: {}", i, payloads[i].refCnt()); + assertEquals(payloads[i].refCnt(), 1); + } + + // Verify: the messages has not been released when calling interceptor. + assertFalse(payloadWasReleasedWhenIntercept.get()); + + // Verify: the order of send complete event. + MessageIdImpl messageIdPreviousOne = null; + for (int i = 0; i < messageCount; i++) { + MessageIdImpl messageId = (MessageIdImpl) sendFutureList.get(i).get(); + if (messageIdPreviousOne != null) { + assertTrue(compareMessageIds(messageIdPreviousOne, messageId) > 0); + } + messageIdPreviousOne = messageId; + } + + // Verify: the order of interceptor events. + for (int i = 0; i < messageCount; i++) { + assertTrue(messageArraySent[i] == messageArrayBeforeSend.get(i)); + assertTrue(messageArraySent[i] == messageArrayOnSendAcknowledgement.get(i)); + } + + // cleanup. + for (int i = 0; i < messageCount; i++) { + payloads[i].release(); + } + producerBase.close(); + admin.topics().delete(topic, false); + } + + private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl messageId2) { + if (messageId2.getLedgerId() < messageId1.getLedgerId()) { + return -1; + } + if (messageId2.getLedgerId() > messageId1.getLedgerId()) { + return 1; + } + if (messageId2.getEntryId() < messageId1.getEntryId()) { + return -1; + } + if (messageId2.getEntryId() > messageId1.getEntryId()) { + return 1; + } + if (messageId2 instanceof BatchMessageIdImpl && messageId1 instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId1 = (BatchMessageIdImpl) messageId1; + BatchMessageIdImpl batchMessageId2 = (BatchMessageIdImpl) messageId2; + return batchMessageId2.getBatchIndex() - batchMessageId1.getBatchIndex(); + } else { + return 0; + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 8d345f0e8b1f6..2bd74db0d460d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -336,73 +336,80 @@ CompletableFuture internalSendAsync(Message message) { if (interceptors != null) { interceptorMessage.getProperties(); } - sendAsync(interceptorMessage, new SendCallback() { - SendCallback nextCallback = null; - MessageImpl nextMsg = null; - long createdAt = System.nanoTime(); - @Override - public CompletableFuture getFuture() { - return future; - } + int msgSize = interceptorMessage.getDataBuffer().readableBytes(); + sendAsync(interceptorMessage, new DefaultSendMessageCallback(future, interceptorMessage, msgSize)); + return future; + } - @Override - public SendCallback getNextSendCallback() { - return nextCallback; - } + private class DefaultSendMessageCallback implements SendCallback { - @Override - public MessageImpl getNextMessage() { - return nextMsg; - } + CompletableFuture sendFuture; + MessageImpl currentMsg; + int msgSize; + long createdAt = System.nanoTime(); + SendCallback nextCallback = null; + MessageImpl nextMsg = null; - @Override - public void sendComplete(Exception e) { - try { - if (e != null) { - stats.incrementSendFailed(); - onSendAcknowledgement(interceptorMessage, null, e); - future.completeExceptionally(e); - } else { - onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null); - future.complete(interceptorMessage.getMessageId()); - stats.incrementNumAcksReceived(System.nanoTime() - createdAt); - } - } finally { - interceptorMessage.getDataBuffer().release(); - } + DefaultSendMessageCallback(CompletableFuture sendFuture, MessageImpl currentMsg, int msgSize) { + this.sendFuture = sendFuture; + this.currentMsg = currentMsg; + this.msgSize = msgSize; + } - while (nextCallback != null) { - SendCallback sendCallback = nextCallback; - MessageImpl msg = nextMsg; - // Retain the buffer used by interceptors callback to get message. Buffer will release after - // complete interceptors. - try { - msg.getDataBuffer().retain(); - if (e != null) { - stats.incrementSendFailed(); - onSendAcknowledgement(msg, null, e); - sendCallback.getFuture().completeExceptionally(e); - } else { - onSendAcknowledgement(msg, msg.getMessageId(), null); - sendCallback.getFuture().complete(msg.getMessageId()); - stats.incrementNumAcksReceived(System.nanoTime() - createdAt); - } - nextMsg = nextCallback.getNextMessage(); - nextCallback = nextCallback.getNextSendCallback(); - } finally { - msg.getDataBuffer().release(); - } - } - } + @Override + public CompletableFuture getFuture() { + return sendFuture; + } - @Override - public void addCallback(MessageImpl msg, SendCallback scb) { - nextMsg = msg; - nextCallback = scb; + @Override + public SendCallback getNextSendCallback() { + return nextCallback; + } + + @Override + public MessageImpl getNextMessage() { + return nextMsg; + } + + @Override + public void sendComplete(Exception e) { + SendCallback loopingCallback = this; + MessageImpl loopingMsg = currentMsg; + while (loopingCallback != null) { + onSendComplete(e, loopingCallback, loopingMsg); + loopingMsg = loopingCallback.getNextMessage(); + loopingCallback = loopingCallback.getNextSendCallback(); + } + } + + private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl msg) { + long createdAt = (sendCallback instanceof ProducerImpl.DefaultSendMessageCallback) + ? ((DefaultSendMessageCallback) sendCallback).createdAt : this.createdAt; + long latencyNanos = System.nanoTime() - createdAt; + ByteBuf payload = msg.getDataBuffer(); + if (payload == null) { + log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.", + topic, producerName); + } else { + ReferenceCountUtil.safeRelease(payload); } - }); - return future; + if (e != null) { + stats.incrementSendFailed(); + onSendAcknowledgement(msg, null, e); + sendCallback.getFuture().completeExceptionally(e); + } else { + stats.incrementNumAcksReceived(latencyNanos); + onSendAcknowledgement(msg, msg.getMessageId(), null); + sendCallback.getFuture().complete(msg.getMessageId()); + } + } + + @Override + public void addCallback(MessageImpl msg, SendCallback scb) { + nextMsg = msg; + nextCallback = scb; + } } @Override