From 321e2559aff412e4748ab000e13fc27596dc179b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Apr 2024 14:36:37 +0800 Subject: [PATCH 1/9] - --- .../api/SimpleProducerConsumerTest.java | 46 +++++++++++++++++++ .../pulsar/client/impl/ProducerImpl.java | 8 ++-- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 4c106d39e7ad7..675df44372fd8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -99,11 +99,13 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; +import org.apache.pulsar.client.impl.ProducerBase; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.client.impl.schema.writer.AvroWriter; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -4692,4 +4694,48 @@ public void flush(ChannelHandlerContext ctx) throws Exception { consumer.close(); admin.topics().delete(topic, false); } + + @Test + public void testPublishWithCreateMessageManually() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp"); + admin.topics().createNonPartitionedTopic(topic); + ProducerBase producerBase = (ProducerBase) pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + + // Create message payload, refCnf = 1 now. + ByteBuf payload1 = PulsarByteBufAllocator.DEFAULT.heapBuffer(1); + payload1.writeByte(1); + ByteBuf payload2 = PulsarByteBufAllocator.DEFAULT.heapBuffer(1); + payload1.writeByte(2); + log.info("payload_1.refCnf 1st: {}", payload1.refCnt()); + log.info("payload_2.refCnf 1st: {}", payload2.refCnt()); + // refCnf = 2 now. + payload1.retain(); + payload2.retain(); + log.info("payload_1.refCnf 2nd: {}", payload1.refCnt()); + log.info("payload_2.refCnf 2nd: {}", payload2.refCnt()); + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.setUncompressedSize(1); + + // Publish message. + // Note: "ProducerBase.sendAsync" is not equals to "Producer.sendAsync". + MessageImpl message1 = MessageImpl.create(topic, null, messageMetadata, payload1, Optional.empty(), + null, Schema.BYTES, 0, true, 0); + MessageImpl message2 = MessageImpl.create(topic, null, messageMetadata, payload2, Optional.empty(), + null, Schema.BYTES, 0, true, 0); + // Release ByteBuf the first time, refCnf = 1 now. + producerBase.sendAsync(message1).thenAccept(ignore_ -> message1.release()); + producerBase.sendAsync(message2).thenAccept(ignore_ -> message2.release()).join(); + + // Assert payload's refCnf. + log.info("payload_1.refCnf 3rd: {}", payload1.refCnt()); + log.info("payload_2.refCnf 3rd: {}", payload2.refCnt()); + assertEquals(payload1.refCnt(), 1); + assertEquals(payload2.refCnt(), 1); + + // cleanup. + payload1.release(); + payload2.release(); + producerBase.close(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index dbd3aae426900..e74b058c94a34 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -404,6 +404,7 @@ public void sendComplete(Exception e) { pendingMessagesUpDownCounter.decrement(); pendingBytesUpDownCounter.subtract(msgSize); + ByteBuf payloadInCurrentMsg = interceptorMessage.getDataBuffer(); try { if (e != null) { latencyHistogram.recordFailure(latencyNanos); @@ -418,7 +419,7 @@ public void sendComplete(Exception e) { stats.incrementNumAcksReceived(latencyNanos); } } finally { - interceptorMessage.getDataBuffer().release(); + payloadInCurrentMsg.release(); } while (nextCallback != null) { @@ -426,8 +427,9 @@ public void sendComplete(Exception e) { MessageImpl msg = nextMsg; // Retain the buffer used by interceptors callback to get message. Buffer will release after // complete interceptors. + ByteBuf payloadInNextMsg = msg.getDataBuffer(); + payloadInNextMsg.retain(); try { - msg.getDataBuffer().retain(); if (e != null) { stats.incrementSendFailed(); onSendAcknowledgement(msg, null, e); @@ -440,7 +442,7 @@ public void sendComplete(Exception e) { nextMsg = nextCallback.getNextMessage(); nextCallback = nextCallback.getNextSendCallback(); } finally { - msg.getDataBuffer().release(); + ReferenceCountUtil.safeRelease(payloadInNextMsg); } } } From 8e3f7876b878ce2703e52c9acd06c5159a22bab5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Apr 2024 15:08:28 +0800 Subject: [PATCH 2/9] - --- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index e74b058c94a34..10e0e320462a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -419,7 +419,7 @@ public void sendComplete(Exception e) { stats.incrementNumAcksReceived(latencyNanos); } } finally { - payloadInCurrentMsg.release(); + ReferenceCountUtil.safeRelease(payloadInCurrentMsg); } while (nextCallback != null) { From f5e5b714e5acf3248e97667496069e9a51d9138b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Apr 2024 15:42:51 +0800 Subject: [PATCH 3/9] - --- .../api/SimpleProducerConsumerTest.java | 46 +++++++++++++++++-- .../pulsar/client/impl/ProducerImpl.java | 3 -- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 675df44372fd8..9879561834512 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -69,6 +69,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -4695,11 +4696,49 @@ public void flush(ChannelHandlerContext ctx) throws Exception { admin.topics().delete(topic, false); } - @Test - public void testPublishWithCreateMessageManually() throws Exception { + @DataProvider(name = "enableBatchSend") + public Object[][] enableBatchSend() { + return new Object[][]{ + {true}, + {false} + }; + } + + @Test(dataProvider = "enableBatchSend") + public void testPublishWithCreateMessageManually(boolean enableBatchSend) throws Exception { + // Create an interceptor to verify the ref count of Message.payload is as expected. + AtomicBoolean payloadWasReleasedWhenIntercept = new AtomicBoolean(false); + ProducerInterceptor interceptor = new ProducerInterceptor(){ + + @Override + public void close() { + + } + @Override + public Message beforeSend(Producer producer, Message message) { + MessageImpl msgImpl = (MessageImpl) message; + log.info("payload.refCnf before send: {}", msgImpl.getDataBuffer().refCnt()); + if (msgImpl.getDataBuffer().refCnt() < 1) { + payloadWasReleasedWhenIntercept.set(true); + } + return message; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, + Throwable exception) { + MessageImpl msgImpl = (MessageImpl) message; + log.info("payload.refCnf on send acknowledgement: {}", msgImpl.getDataBuffer().refCnt()); + if (msgImpl.getDataBuffer().refCnt() < 1) { + payloadWasReleasedWhenIntercept.set(true); + } + } + }; + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp"); admin.topics().createNonPartitionedTopic(topic); - ProducerBase producerBase = (ProducerBase) pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + ProducerBase producerBase = (ProducerBase) pulsarClient.newProducer().topic(topic).intercept(interceptor) + .enableBatching(enableBatchSend).create(); // Create message payload, refCnf = 1 now. ByteBuf payload1 = PulsarByteBufAllocator.DEFAULT.heapBuffer(1); @@ -4731,6 +4770,7 @@ public void testPublishWithCreateMessageManually() throws Exception { log.info("payload_2.refCnf 3rd: {}", payload2.refCnt()); assertEquals(payload1.refCnt(), 1); assertEquals(payload2.refCnt(), 1); + assertFalse(payloadWasReleasedWhenIntercept.get()); // cleanup. payload1.release(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 10e0e320462a1..b094bcb84c5c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -425,10 +425,7 @@ public void sendComplete(Exception e) { while (nextCallback != null) { SendCallback sendCallback = nextCallback; MessageImpl msg = nextMsg; - // Retain the buffer used by interceptors callback to get message. Buffer will release after - // complete interceptors. ByteBuf payloadInNextMsg = msg.getDataBuffer(); - payloadInNextMsg.retain(); try { if (e != null) { stats.incrementSendFailed(); From 9ad0ad96c5367a2ed81e4e1bc59f143ea0fd2e27 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Apr 2024 22:16:26 +0800 Subject: [PATCH 4/9] address comments --- .../api/SimpleProducerConsumerTest.java | 118 +++++++++++++----- .../pulsar/client/impl/ProducerImpl.java | 68 +++++----- 2 files changed, 122 insertions(+), 64 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 9879561834512..7552b84a1c553 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -51,6 +51,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -93,6 +94,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ConsumerBase; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -4706,6 +4708,9 @@ public Object[][] enableBatchSend() { @Test(dataProvider = "enableBatchSend") public void testPublishWithCreateMessageManually(boolean enableBatchSend) throws Exception { + final int messageCount = 10; + final List messageArrayBeforeSend = Collections.synchronizedList(new ArrayList<>()); + final List messageArrayOnSendAcknowledgement = Collections.synchronizedList(new ArrayList<>()); // Create an interceptor to verify the ref count of Message.payload is as expected. AtomicBoolean payloadWasReleasedWhenIntercept = new AtomicBoolean(false); ProducerInterceptor interceptor = new ProducerInterceptor(){ @@ -4721,6 +4726,7 @@ public Message beforeSend(Producer producer, Message message) { if (msgImpl.getDataBuffer().refCnt() < 1) { payloadWasReleasedWhenIntercept.set(true); } + messageArrayBeforeSend.add(msgImpl); return message; } @@ -4732,6 +4738,7 @@ public void onSendAcknowledgement(Producer producer, Message message, MessageId if (msgImpl.getDataBuffer().refCnt() < 1) { payloadWasReleasedWhenIntercept.set(true); } + messageArrayOnSendAcknowledgement.add(msgImpl); } }; @@ -4740,42 +4747,93 @@ public void onSendAcknowledgement(Producer producer, Message message, MessageId ProducerBase producerBase = (ProducerBase) pulsarClient.newProducer().topic(topic).intercept(interceptor) .enableBatching(enableBatchSend).create(); - // Create message payload, refCnf = 1 now. - ByteBuf payload1 = PulsarByteBufAllocator.DEFAULT.heapBuffer(1); - payload1.writeByte(1); - ByteBuf payload2 = PulsarByteBufAllocator.DEFAULT.heapBuffer(1); - payload1.writeByte(2); - log.info("payload_1.refCnf 1st: {}", payload1.refCnt()); - log.info("payload_2.refCnf 1st: {}", payload2.refCnt()); - // refCnf = 2 now. - payload1.retain(); - payload2.retain(); - log.info("payload_1.refCnf 2nd: {}", payload1.refCnt()); - log.info("payload_2.refCnf 2nd: {}", payload2.refCnt()); - MessageMetadata messageMetadata = new MessageMetadata(); - messageMetadata.setUncompressedSize(1); - // Publish message. // Note: "ProducerBase.sendAsync" is not equals to "Producer.sendAsync". - MessageImpl message1 = MessageImpl.create(topic, null, messageMetadata, payload1, Optional.empty(), - null, Schema.BYTES, 0, true, 0); - MessageImpl message2 = MessageImpl.create(topic, null, messageMetadata, payload2, Optional.empty(), - null, Schema.BYTES, 0, true, 0); - // Release ByteBuf the first time, refCnf = 1 now. - producerBase.sendAsync(message1).thenAccept(ignore_ -> message1.release()); - producerBase.sendAsync(message2).thenAccept(ignore_ -> message2.release()).join(); - - // Assert payload's refCnf. - log.info("payload_1.refCnf 3rd: {}", payload1.refCnt()); - log.info("payload_2.refCnf 3rd: {}", payload2.refCnt()); - assertEquals(payload1.refCnt(), 1); - assertEquals(payload2.refCnt(), 1); + final MessageImpl[] messageArraySent = new MessageImpl[messageCount]; + final ByteBuf[] payloads = new ByteBuf[messageCount]; + List> sendFutureList = new ArrayList<>(); + List releaseFutureList = new ArrayList<>(); + for (int i = 0; i < messageCount; i++) { + // Create message payload, refCnf = 1 now. + ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1); + payloads[i] = payload; + log.info("payload_{}.refCnf 1st: {}", i, payload.refCnt()); + payload.writeByte(i); + // refCnf = 2 now. + payload.retain(); + log.info("payload_{}.refCnf 2nd: {}", i, payload.refCnt()); + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.setUncompressedSize(1); + MessageImpl message1 = MessageImpl.create(topic, null, messageMetadata, payload, Optional.empty(), + null, Schema.BYTES, 0, true, 0); + messageArraySent[i] = message1; + // Release ByteBuf the first time, refCnf = 1 now. + CompletableFuture future = producerBase.sendAsync(message1); + sendFutureList.add(future); + final int indexForLog = i; + future.whenComplete((v, ex) -> { + message1.release(); + log.info("payload_{}.refCnf 3rd after_complete_refCnf: {}, ex: {}", indexForLog, payload.refCnt(), + ex == null ? "null" : ex.getMessage()); + }); + } + sendFutureList.get(messageCount - 1).join(); + + // Left 2 seconds to wait the code in the finally-block, which is using to avoid this test to be flaky. + Thread.sleep(1000 * 2); + + // Verify: payload's refCnf. + for (int i = 0; i < messageCount; i++) { + log.info("payload_{}.refCnf 4th: {}", i, payloads[i].refCnt()); + assertEquals(payloads[i].refCnt(), 1); + } + + // Verify: the messages has not been released when calling interceptor. assertFalse(payloadWasReleasedWhenIntercept.get()); + // Verify: the order of send complete event. + MessageIdImpl messageIdPreviousOne = null; + for (int i = 0; i < messageCount; i++) { + MessageIdImpl messageId = (MessageIdImpl) sendFutureList.get(i).get(); + if (messageIdPreviousOne != null) { + assertTrue(compareMessageIds(messageIdPreviousOne, messageId) > 0); + } + messageIdPreviousOne = messageId; + } + + // Verify: the order of interceptor events. + for (int i = 0; i < messageCount; i++) { + assertTrue(messageArraySent[i] == messageArrayBeforeSend.get(i)); + assertTrue(messageArraySent[i] == messageArrayOnSendAcknowledgement.get(i)); + } + // cleanup. - payload1.release(); - payload2.release(); + for (int i = 0; i < messageCount; i++) { + payloads[i].release(); + } producerBase.close(); admin.topics().delete(topic, false); } + + private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl messageId2) { + if (messageId2.getLedgerId() < messageId1.getLedgerId()) { + return -1; + } + if (messageId2.getLedgerId() > messageId1.getLedgerId()) { + return 1; + } + if (messageId2.getEntryId() < messageId1.getEntryId()) { + return -1; + } + if (messageId2.getEntryId() > messageId1.getEntryId()) { + return 1; + } + if (messageId2 instanceof BatchMessageIdImpl && messageId1 instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId1 = (BatchMessageIdImpl) messageId1; + BatchMessageIdImpl batchMessageId2 = (BatchMessageIdImpl) messageId2; + return batchMessageId2.getBatchIndex() - batchMessageId1.getBatchIndex(); + } else { + return 0; + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b094bcb84c5c1..e1776bc45181b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -400,46 +400,46 @@ public MessageImpl getNextMessage() { @Override public void sendComplete(Exception e) { + SendCallback loopingCallback = this; + MessageImpl loopingMsg = interceptorMessage; + while (loopingCallback != null) { + onSendComplete(e, loopingCallback, loopingMsg); + loopingMsg = loopingCallback.getNextMessage(); + loopingCallback = loopingCallback.getNextSendCallback(); + } + } + + private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl msg) { long latencyNanos = System.nanoTime() - createdAt; pendingMessagesUpDownCounter.decrement(); pendingBytesUpDownCounter.subtract(msgSize); - - ByteBuf payloadInCurrentMsg = interceptorMessage.getDataBuffer(); - try { - if (e != null) { - latencyHistogram.recordFailure(latencyNanos); - stats.incrementSendFailed(); - onSendAcknowledgement(interceptorMessage, null, e); - future.completeExceptionally(e); - } else { - latencyHistogram.recordSuccess(latencyNanos); - publishedBytesCounter.add(msgSize); - onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null); - future.complete(interceptorMessage.getMessageId()); - stats.incrementNumAcksReceived(latencyNanos); - } - } finally { - ReferenceCountUtil.safeRelease(payloadInCurrentMsg); - } - - while (nextCallback != null) { - SendCallback sendCallback = nextCallback; - MessageImpl msg = nextMsg; - ByteBuf payloadInNextMsg = msg.getDataBuffer(); + ByteBuf payload = msg.getDataBuffer(); + if (e != null) { + latencyHistogram.recordFailure(latencyNanos); + stats.incrementSendFailed(); try { - if (e != null) { - stats.incrementSendFailed(); - onSendAcknowledgement(msg, null, e); - sendCallback.getFuture().completeExceptionally(e); - } else { - onSendAcknowledgement(msg, msg.getMessageId(), null); - sendCallback.getFuture().complete(msg.getMessageId()); - stats.incrementNumAcksReceived(System.nanoTime() - createdAt); + onSendAcknowledgement(msg, null, e); + sendCallback.getFuture().completeExceptionally(e); + } finally { + if (payload == null) { + log.error("[{}] [{}] Payload is null when calling a failed onSendComplete, which is not" + + " expected.", topic, producerName); } - nextMsg = nextCallback.getNextMessage(); - nextCallback = nextCallback.getNextSendCallback(); + ReferenceCountUtil.safeRelease(payload); + } + } else { + latencyHistogram.recordSuccess(latencyNanos); + publishedBytesCounter.add(msgSize); + stats.incrementNumAcksReceived(latencyNanos); + try { + onSendAcknowledgement(msg, msg.getMessageId(), null); + sendCallback.getFuture().complete(msg.getMessageId()); } finally { - ReferenceCountUtil.safeRelease(payloadInNextMsg); + if (payload == null) { + log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.", + topic, producerName); + } + ReferenceCountUtil.safeRelease(payload); } } } From 8d382f7e5f9a2c5328a24e772d8b2e71772dee85 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Apr 2024 22:28:11 +0800 Subject: [PATCH 5/9] fix the inaccurate metrics: latencyNanos --- .../impl/DefaultSendMessageCallback.java | 4 + .../pulsar/client/impl/ProducerImpl.java | 140 ++++++++++-------- 2 files changed, 81 insertions(+), 63 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultSendMessageCallback.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultSendMessageCallback.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultSendMessageCallback.java new file mode 100644 index 0000000000000..f9f2d59d750a9 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultSendMessageCallback.java @@ -0,0 +1,4 @@ +package org.apache.pulsar.client.impl; + +public class DefaultSendMessageCallback { +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index e1776bc45181b..f5d8b390d0e31 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -378,79 +378,93 @@ CompletableFuture internalSendAsync(Message message) { pendingMessagesUpDownCounter.increment(); pendingBytesUpDownCounter.add(msgSize); - sendAsync(interceptorMessage, new SendCallback() { - SendCallback nextCallback = null; - MessageImpl nextMsg = null; - long createdAt = System.nanoTime(); + sendAsync(interceptorMessage, new DefaultSendMessageCallback(future, interceptorMessage, msgSize)); + return future; + } - @Override - public CompletableFuture getFuture() { - return future; - } + private class DefaultSendMessageCallback implements SendCallback { - @Override - public SendCallback getNextSendCallback() { - return nextCallback; - } + CompletableFuture sendFuture; + MessageImpl currentMsg; + int msgSize; + long createdAt = System.nanoTime(); + SendCallback nextCallback = null; + MessageImpl nextMsg = null; - @Override - public MessageImpl getNextMessage() { - return nextMsg; - } + DefaultSendMessageCallback(CompletableFuture sendFuture, MessageImpl currentMsg, int msgSize) { + this.sendFuture = sendFuture; + this.currentMsg = currentMsg; + this.msgSize = msgSize; + } - @Override - public void sendComplete(Exception e) { - SendCallback loopingCallback = this; - MessageImpl loopingMsg = interceptorMessage; - while (loopingCallback != null) { - onSendComplete(e, loopingCallback, loopingMsg); - loopingMsg = loopingCallback.getNextMessage(); - loopingCallback = loopingCallback.getNextSendCallback(); - } - } + @Override + public CompletableFuture getFuture() { + return sendFuture; + } - private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl msg) { - long latencyNanos = System.nanoTime() - createdAt; - pendingMessagesUpDownCounter.decrement(); - pendingBytesUpDownCounter.subtract(msgSize); - ByteBuf payload = msg.getDataBuffer(); - if (e != null) { - latencyHistogram.recordFailure(latencyNanos); - stats.incrementSendFailed(); - try { - onSendAcknowledgement(msg, null, e); - sendCallback.getFuture().completeExceptionally(e); - } finally { - if (payload == null) { - log.error("[{}] [{}] Payload is null when calling a failed onSendComplete, which is not" - + " expected.", topic, producerName); - } - ReferenceCountUtil.safeRelease(payload); + @Override + public SendCallback getNextSendCallback() { + return nextCallback; + } + + @Override + public MessageImpl getNextMessage() { + return nextMsg; + } + + @Override + public void sendComplete(Exception e) { + SendCallback loopingCallback = this; + MessageImpl loopingMsg = currentMsg; + while (loopingCallback != null) { + onSendComplete(e, loopingCallback, loopingMsg); + loopingMsg = loopingCallback.getNextMessage(); + loopingCallback = loopingCallback.getNextSendCallback(); + } + } + + private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl msg) { + long createdAt = (sendCallback instanceof ProducerImpl.DefaultSendMessageCallback) ? + ((DefaultSendMessageCallback) sendCallback).createdAt : this.createdAt; + long latencyNanos = System.nanoTime() - createdAt; + pendingMessagesUpDownCounter.decrement(); + pendingBytesUpDownCounter.subtract(msgSize); + ByteBuf payload = msg.getDataBuffer(); + if (e != null) { + latencyHistogram.recordFailure(latencyNanos); + stats.incrementSendFailed(); + try { + onSendAcknowledgement(msg, null, e); + sendCallback.getFuture().completeExceptionally(e); + } finally { + if (payload == null) { + log.error("[{}] [{}] Payload is null when calling a failed onSendComplete, which is not" + + " expected.", topic, producerName); } - } else { - latencyHistogram.recordSuccess(latencyNanos); - publishedBytesCounter.add(msgSize); - stats.incrementNumAcksReceived(latencyNanos); - try { - onSendAcknowledgement(msg, msg.getMessageId(), null); - sendCallback.getFuture().complete(msg.getMessageId()); - } finally { - if (payload == null) { - log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.", - topic, producerName); - } - ReferenceCountUtil.safeRelease(payload); + ReferenceCountUtil.safeRelease(payload); + } + } else { + latencyHistogram.recordSuccess(latencyNanos); + publishedBytesCounter.add(msgSize); + stats.incrementNumAcksReceived(latencyNanos); + try { + onSendAcknowledgement(msg, msg.getMessageId(), null); + sendCallback.getFuture().complete(msg.getMessageId()); + } finally { + if (payload == null) { + log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.", + topic, producerName); } + ReferenceCountUtil.safeRelease(payload); } } + } - @Override - public void addCallback(MessageImpl msg, SendCallback scb) { - nextMsg = msg; - nextCallback = scb; - } - }); - return future; + @Override + public void addCallback(MessageImpl msg, SendCallback scb) { + nextMsg = msg; + nextCallback = scb; + } } @Override From bf6ba586b6853f958dec76f540b675915d96c668 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Apr 2024 22:37:03 +0800 Subject: [PATCH 6/9] remove unnecessary class --- .../apache/pulsar/client/impl/DefaultSendMessageCallback.java | 4 ---- 1 file changed, 4 deletions(-) delete mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultSendMessageCallback.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultSendMessageCallback.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultSendMessageCallback.java deleted file mode 100644 index f9f2d59d750a9..0000000000000 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultSendMessageCallback.java +++ /dev/null @@ -1,4 +0,0 @@ -package org.apache.pulsar.client.impl; - -public class DefaultSendMessageCallback { -} From d3e792651e6f6f00e48de45b069435d12d25d9ce Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 2 Apr 2024 22:47:34 +0800 Subject: [PATCH 7/9] remove unnecessary class --- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index f5d8b390d0e31..5c308ceb0a43a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -424,8 +424,8 @@ public void sendComplete(Exception e) { } private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl msg) { - long createdAt = (sendCallback instanceof ProducerImpl.DefaultSendMessageCallback) ? - ((DefaultSendMessageCallback) sendCallback).createdAt : this.createdAt; + long createdAt = (sendCallback instanceof ProducerImpl.DefaultSendMessageCallback) + ? ((DefaultSendMessageCallback) sendCallback).createdAt : this.createdAt; long latencyNanos = System.nanoTime() - createdAt; pendingMessagesUpDownCounter.decrement(); pendingBytesUpDownCounter.subtract(msgSize); From ca585b252bb7aa1af2c53713653221e9c88e76db Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 3 Apr 2024 00:26:05 +0800 Subject: [PATCH 8/9] address comment --- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 5c308ceb0a43a..5f624bfd9d4b1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -440,6 +440,7 @@ private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl< if (payload == null) { log.error("[{}] [{}] Payload is null when calling a failed onSendComplete, which is not" + " expected.", topic, producerName); + return; } ReferenceCountUtil.safeRelease(payload); } @@ -454,6 +455,7 @@ private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl< if (payload == null) { log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.", topic, producerName); + return; } ReferenceCountUtil.safeRelease(payload); } From 68bdbb48d88443a4b456c3d9c7cd67679e21f6dc Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 3 Apr 2024 10:36:04 +0800 Subject: [PATCH 9/9] address comment --- .../pulsar/client/impl/ProducerImpl.java | 32 ++++++------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 5f624bfd9d4b1..b8def7e3042bd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -430,35 +430,23 @@ private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl< pendingMessagesUpDownCounter.decrement(); pendingBytesUpDownCounter.subtract(msgSize); ByteBuf payload = msg.getDataBuffer(); + if (payload == null) { + log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.", + topic, producerName); + } else { + ReferenceCountUtil.safeRelease(payload); + } if (e != null) { latencyHistogram.recordFailure(latencyNanos); stats.incrementSendFailed(); - try { - onSendAcknowledgement(msg, null, e); - sendCallback.getFuture().completeExceptionally(e); - } finally { - if (payload == null) { - log.error("[{}] [{}] Payload is null when calling a failed onSendComplete, which is not" - + " expected.", topic, producerName); - return; - } - ReferenceCountUtil.safeRelease(payload); - } + onSendAcknowledgement(msg, null, e); + sendCallback.getFuture().completeExceptionally(e); } else { latencyHistogram.recordSuccess(latencyNanos); publishedBytesCounter.add(msgSize); stats.incrementNumAcksReceived(latencyNanos); - try { - onSendAcknowledgement(msg, msg.getMessageId(), null); - sendCallback.getFuture().complete(msg.getMessageId()); - } finally { - if (payload == null) { - log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.", - topic, producerName); - return; - } - ReferenceCountUtil.safeRelease(payload); - } + onSendAcknowledgement(msg, msg.getMessageId(), null); + sendCallback.getFuture().complete(msg.getMessageId()); } }