Skip to content

Commit

Permalink
[fix][broker] Miss headersAndPayload in MessagePublishContext
Browse files Browse the repository at this point in the history
  • Loading branch information
kecona committed Sep 27, 2023
1 parent 7ed452e commit dab6bb3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,15 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, l
boolean isMarker) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, sequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize,
headersAndPayload, batchSize,
isChunked, System.nanoTime(), isMarker));
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
long batchSize, boolean isChunked, boolean isMarker) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
highestSequenceId, msgIn, headersAndPayload, batchSize,
isChunked, System.nanoTime(), isMarker));
}

Expand Down Expand Up @@ -357,6 +357,8 @@ private static final class MessagePublishContext implements PublishContext, Runn
private long highestSequenceId;
private long originalHighestSequenceId;

private ByteBuf headerAndPayload;

public String getProducerName() {
return producer.getProducerName();
}
Expand Down Expand Up @@ -493,39 +495,42 @@ public void run() {
recycle();
}

static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, ByteBuf headersAndPayload,
long batchSize, boolean chunked, long startTimeNs, boolean isMarker) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
callback.rateIn = rateIn;
callback.msgSize = msgSize;
callback.msgSize = headersAndPayload.readableBytes();
callback.batchSize = batchSize;
callback.chunked = chunked;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.isMarker = isMarker;
callback.headerAndPayload = headersAndPayload;
if (callback.propertyMap != null) {
callback.propertyMap.clear();
}
return callback;
}

static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker) {
ByteBuf headersAndPayload, long batchSize, boolean chunked, long startTimeNs,
boolean isMarker) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
callback.highestSequenceId = highestSequenceId;
callback.rateIn = rateIn;
callback.msgSize = msgSize;
callback.msgSize = headersAndPayload.readableBytes();
callback.batchSize = batchSize;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.chunked = chunked;
callback.isMarker = isMarker;
callback.headerAndPayload = headersAndPayload;
if (callback.propertyMap != null) {
callback.propertyMap.clear();
}
Expand All @@ -542,6 +547,11 @@ public boolean isMarkerMessage() {
return isMarker;
}

@Override
public ByteBuf getHeaderAndPayload() {
return headerAndPayload;
}

private final Handle<MessagePublishContext> recyclerHandle;

private MessagePublishContext(Handle<MessagePublishContext> recyclerHandle) {
Expand All @@ -568,6 +578,7 @@ public void recycle() {
startTimeNs = -1L;
chunked = false;
isMarker = false;
headerAndPayload = null;
if (propertyMap != null) {
propertyMap.clear();
}
Expand Down Expand Up @@ -735,7 +746,7 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize);
topic.publishTxnMessage(txnID, headersAndPayload,
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker));
headersAndPayload, batchSize, isChunked, System.nanoTime(), isMarker));
}

public SchemaVersion getSchemaVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ default Object getProperty(String propertyName) {
default boolean isChunked() {
return false;
}

default ByteBuf getHeaderAndPayload() {
return null;
}
}

CompletableFuture<Void> initialize();
Expand Down

0 comments on commit dab6bb3

Please sign in to comment.