Skip to content

Commit

Permalink
[fix][client] Fix client side memory leak when call MessageImpl.creat…
Browse files Browse the repository at this point in the history
…e and fix imprecise client-side metrics: pendingMessagesUpDownCounter, pendingBytesUpDownCounter, latencyHistogram (apache#22393)

(cherry picked from commit 2469b97)
(cherry picked from commit 14b6279)
  • Loading branch information
poorbarcode authored and mukesh-ctds committed Apr 15, 2024
1 parent 88a9664 commit 246f0ee
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -69,6 +70,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -92,18 +94,21 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -4692,4 +4697,143 @@ public void flush(ChannelHandlerContext ctx) throws Exception {
consumer.close();
admin.topics().delete(topic, false);
}

@DataProvider(name = "enableBatchSend")
public Object[][] enableBatchSend() {
return new Object[][]{
{true},
{false}
};
}

@Test(dataProvider = "enableBatchSend")
public void testPublishWithCreateMessageManually(boolean enableBatchSend) throws Exception {
final int messageCount = 10;
final List<MessageImpl> messageArrayBeforeSend = Collections.synchronizedList(new ArrayList<>());
final List<MessageImpl> messageArrayOnSendAcknowledgement = Collections.synchronizedList(new ArrayList<>());
// Create an interceptor to verify the ref count of Message.payload is as expected.
AtomicBoolean payloadWasReleasedWhenIntercept = new AtomicBoolean(false);
ProducerInterceptor interceptor = new ProducerInterceptor(){

@Override
public void close() {

}
@Override
public Message beforeSend(Producer producer, Message message) {
MessageImpl msgImpl = (MessageImpl) message;
log.info("payload.refCnf before send: {}", msgImpl.getDataBuffer().refCnt());
if (msgImpl.getDataBuffer().refCnt() < 1) {
payloadWasReleasedWhenIntercept.set(true);
}
messageArrayBeforeSend.add(msgImpl);
return message;
}

@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
Throwable exception) {
MessageImpl msgImpl = (MessageImpl) message;
log.info("payload.refCnf on send acknowledgement: {}", msgImpl.getDataBuffer().refCnt());
if (msgImpl.getDataBuffer().refCnt() < 1) {
payloadWasReleasedWhenIntercept.set(true);
}
messageArrayOnSendAcknowledgement.add(msgImpl);
}
};

final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
admin.topics().createNonPartitionedTopic(topic);
ProducerBase producerBase = (ProducerBase) pulsarClient.newProducer().topic(topic).intercept(interceptor)
.enableBatching(enableBatchSend).create();

// Publish message.
// Note: "ProducerBase.sendAsync" is not equals to "Producer.sendAsync".
final MessageImpl[] messageArraySent = new MessageImpl[messageCount];
final ByteBuf[] payloads = new ByteBuf[messageCount];
List<CompletableFuture<MessageId>> sendFutureList = new ArrayList<>();
List<CompletableFuture> releaseFutureList = new ArrayList<>();
for (int i = 0; i < messageCount; i++) {
// Create message payload, refCnf = 1 now.
ByteBuf payload = PulsarByteBufAllocator.DEFAULT.heapBuffer(1);
payloads[i] = payload;
log.info("payload_{}.refCnf 1st: {}", i, payload.refCnt());
payload.writeByte(i);
// refCnf = 2 now.
payload.retain();
log.info("payload_{}.refCnf 2nd: {}", i, payload.refCnt());
MessageMetadata messageMetadata = new MessageMetadata();
messageMetadata.setUncompressedSize(1);
MessageImpl<byte[]> message1 = MessageImpl.create(topic, null, messageMetadata, payload, Optional.empty(),
null, Schema.BYTES, 0, true, 0);
messageArraySent[i] = message1;
// Release ByteBuf the first time, refCnf = 1 now.
CompletableFuture<MessageId> future = producerBase.sendAsync(message1);
sendFutureList.add(future);
final int indexForLog = i;
future.whenComplete((v, ex) -> {
message1.release();
log.info("payload_{}.refCnf 3rd after_complete_refCnf: {}, ex: {}", indexForLog, payload.refCnt(),
ex == null ? "null" : ex.getMessage());
});
}
sendFutureList.get(messageCount - 1).join();

// Left 2 seconds to wait the code in the finally-block, which is using to avoid this test to be flaky.
Thread.sleep(1000 * 2);

// Verify: payload's refCnf.
for (int i = 0; i < messageCount; i++) {
log.info("payload_{}.refCnf 4th: {}", i, payloads[i].refCnt());
assertEquals(payloads[i].refCnt(), 1);
}

// Verify: the messages has not been released when calling interceptor.
assertFalse(payloadWasReleasedWhenIntercept.get());

// Verify: the order of send complete event.
MessageIdImpl messageIdPreviousOne = null;
for (int i = 0; i < messageCount; i++) {
MessageIdImpl messageId = (MessageIdImpl) sendFutureList.get(i).get();
if (messageIdPreviousOne != null) {
assertTrue(compareMessageIds(messageIdPreviousOne, messageId) > 0);
}
messageIdPreviousOne = messageId;
}

// Verify: the order of interceptor events.
for (int i = 0; i < messageCount; i++) {
assertTrue(messageArraySent[i] == messageArrayBeforeSend.get(i));
assertTrue(messageArraySent[i] == messageArrayOnSendAcknowledgement.get(i));
}

// cleanup.
for (int i = 0; i < messageCount; i++) {
payloads[i].release();
}
producerBase.close();
admin.topics().delete(topic, false);
}

private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl messageId2) {
if (messageId2.getLedgerId() < messageId1.getLedgerId()) {
return -1;
}
if (messageId2.getLedgerId() > messageId1.getLedgerId()) {
return 1;
}
if (messageId2.getEntryId() < messageId1.getEntryId()) {
return -1;
}
if (messageId2.getEntryId() > messageId1.getEntryId()) {
return 1;
}
if (messageId2 instanceof BatchMessageIdImpl && messageId1 instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId1 = (BatchMessageIdImpl) messageId1;
BatchMessageIdImpl batchMessageId2 = (BatchMessageIdImpl) messageId2;
return batchMessageId2.getBatchIndex() - batchMessageId1.getBatchIndex();
} else {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,73 +336,80 @@ CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
if (interceptors != null) {
interceptorMessage.getProperties();
}
sendAsync(interceptorMessage, new SendCallback() {
SendCallback nextCallback = null;
MessageImpl<?> nextMsg = null;
long createdAt = System.nanoTime();

@Override
public CompletableFuture<MessageId> getFuture() {
return future;
}
int msgSize = interceptorMessage.getDataBuffer().readableBytes();
sendAsync(interceptorMessage, new DefaultSendMessageCallback(future, interceptorMessage, msgSize));
return future;
}

@Override
public SendCallback getNextSendCallback() {
return nextCallback;
}
private class DefaultSendMessageCallback implements SendCallback {

@Override
public MessageImpl<?> getNextMessage() {
return nextMsg;
}
CompletableFuture<MessageId> sendFuture;
MessageImpl<?> currentMsg;
int msgSize;
long createdAt = System.nanoTime();
SendCallback nextCallback = null;
MessageImpl<?> nextMsg = null;

@Override
public void sendComplete(Exception e) {
try {
if (e != null) {
stats.incrementSendFailed();
onSendAcknowledgement(interceptorMessage, null, e);
future.completeExceptionally(e);
} else {
onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
future.complete(interceptorMessage.getMessageId());
stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
}
} finally {
interceptorMessage.getDataBuffer().release();
}
DefaultSendMessageCallback(CompletableFuture<MessageId> sendFuture, MessageImpl<?> currentMsg, int msgSize) {
this.sendFuture = sendFuture;
this.currentMsg = currentMsg;
this.msgSize = msgSize;
}

while (nextCallback != null) {
SendCallback sendCallback = nextCallback;
MessageImpl<?> msg = nextMsg;
// Retain the buffer used by interceptors callback to get message. Buffer will release after
// complete interceptors.
try {
msg.getDataBuffer().retain();
if (e != null) {
stats.incrementSendFailed();
onSendAcknowledgement(msg, null, e);
sendCallback.getFuture().completeExceptionally(e);
} else {
onSendAcknowledgement(msg, msg.getMessageId(), null);
sendCallback.getFuture().complete(msg.getMessageId());
stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
}
nextMsg = nextCallback.getNextMessage();
nextCallback = nextCallback.getNextSendCallback();
} finally {
msg.getDataBuffer().release();
}
}
}
@Override
public CompletableFuture<MessageId> getFuture() {
return sendFuture;
}

@Override
public void addCallback(MessageImpl<?> msg, SendCallback scb) {
nextMsg = msg;
nextCallback = scb;
@Override
public SendCallback getNextSendCallback() {
return nextCallback;
}

@Override
public MessageImpl<?> getNextMessage() {
return nextMsg;
}

@Override
public void sendComplete(Exception e) {
SendCallback loopingCallback = this;
MessageImpl<?> loopingMsg = currentMsg;
while (loopingCallback != null) {
onSendComplete(e, loopingCallback, loopingMsg);
loopingMsg = loopingCallback.getNextMessage();
loopingCallback = loopingCallback.getNextSendCallback();
}
}

private void onSendComplete(Exception e, SendCallback sendCallback, MessageImpl<?> msg) {
long createdAt = (sendCallback instanceof ProducerImpl.DefaultSendMessageCallback)
? ((DefaultSendMessageCallback) sendCallback).createdAt : this.createdAt;
long latencyNanos = System.nanoTime() - createdAt;
ByteBuf payload = msg.getDataBuffer();
if (payload == null) {
log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.",
topic, producerName);
} else {
ReferenceCountUtil.safeRelease(payload);
}
});
return future;
if (e != null) {
stats.incrementSendFailed();
onSendAcknowledgement(msg, null, e);
sendCallback.getFuture().completeExceptionally(e);
} else {
stats.incrementNumAcksReceived(latencyNanos);
onSendAcknowledgement(msg, msg.getMessageId(), null);
sendCallback.getFuture().complete(msg.getMessageId());
}
}

@Override
public void addCallback(MessageImpl<?> msg, SendCallback scb) {
nextMsg = msg;
nextCallback = scb;
}
}

@Override
Expand Down

0 comments on commit 246f0ee

Please sign in to comment.