From dab6bb393d4c4e1762860496a371c20fb192a7ef Mon Sep 17 00:00:00 2001 From: kecona Date: Thu, 28 Sep 2023 00:15:57 +0800 Subject: [PATCH] [fix][broker] Miss headersAndPayload in MessagePublishContext --- .../pulsar/broker/service/Producer.java | 25 +++++++++++++------ .../apache/pulsar/broker/service/Topic.java | 4 +++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 6ad07a70a3796..1b204ac55e779 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -251,7 +251,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, l boolean isMarker) { topic.publishMessage(headersAndPayload, MessagePublishContext.get(this, sequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, + headersAndPayload, batchSize, isChunked, System.nanoTime(), isMarker)); } @@ -259,7 +259,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenc long batchSize, boolean isChunked, boolean isMarker) { topic.publishMessage(headersAndPayload, MessagePublishContext.get(this, lowestSequenceId, - highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, + highestSequenceId, msgIn, headersAndPayload, batchSize, isChunked, System.nanoTime(), isMarker)); } @@ -357,6 +357,8 @@ private static final class MessagePublishContext implements PublishContext, Runn private long highestSequenceId; private long originalHighestSequenceId; + private ByteBuf headerAndPayload; + public String getProducerName() { return producer.getProducerName(); } @@ -493,19 +495,20 @@ public void run() { recycle(); } - static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize, + static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, ByteBuf headersAndPayload, long batchSize, boolean chunked, long startTimeNs, boolean isMarker) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; callback.rateIn = rateIn; - callback.msgSize = msgSize; + callback.msgSize = headersAndPayload.readableBytes(); callback.batchSize = batchSize; callback.chunked = chunked; callback.originalProducerName = null; callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.isMarker = isMarker; + callback.headerAndPayload = headersAndPayload; if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -513,19 +516,21 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, - int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker) { + ByteBuf headersAndPayload, long batchSize, boolean chunked, long startTimeNs, + boolean isMarker) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; callback.highestSequenceId = highestSequenceId; callback.rateIn = rateIn; - callback.msgSize = msgSize; + callback.msgSize = headersAndPayload.readableBytes(); callback.batchSize = batchSize; callback.originalProducerName = null; callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.chunked = chunked; callback.isMarker = isMarker; + callback.headerAndPayload = headersAndPayload; if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -542,6 +547,11 @@ public boolean isMarkerMessage() { return isMarker; } + @Override + public ByteBuf getHeaderAndPayload() { + return headerAndPayload; + } + private final Handle recyclerHandle; private MessagePublishContext(Handle recyclerHandle) { @@ -568,6 +578,7 @@ public void recycle() { startTimeNs = -1L; chunked = false; isMarker = false; + headerAndPayload = null; if (propertyMap != null) { propertyMap.clear(); } @@ -735,7 +746,7 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize); topic.publishTxnMessage(txnID, headersAndPayload, MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker)); + headersAndPayload, batchSize, isChunked, System.nanoTime(), isMarker)); } public SchemaVersion getSchemaVersion() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index e2ffb41390a7e..552a122664af8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -112,6 +112,10 @@ default Object getProperty(String propertyName) { default boolean isChunked() { return false; } + + default ByteBuf getHeaderAndPayload() { + return null; + } } CompletableFuture initialize();