Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NPE when MultiTopicsConsumerImpl receives null value messages #9113

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
Expand All @@ -52,13 +57,23 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void nullValueBytesSchemaTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/null-value-bytes-test";
@DataProvider(name = "topics")
public static Object[][] topics() {
return new Object[][]{
{"persistent://prop/ns-abc/null-value-test-0", 1},
{"persistent://prop/ns-abc/null-value-test-1", 3},
};
}

@Test(dataProvider = "topics")
public void nullValueBytesSchemaTest(String topic, int partitions)
throws PulsarClientException, PulsarAdminException {
admin.topics().createPartitionedTopic(topic, partitions);

@Cleanup
Producer producer = pulsarClient.newProducer()
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

@Cleanup
Expand Down Expand Up @@ -120,13 +135,15 @@ public void nullValueBytesSchemaTest() throws PulsarClientException {

}

@Test
public void nullValueBooleanSchemaTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/null-value-bool-test";
@Test(dataProvider = "topics")
public void nullValueBooleanSchemaTest(String topic, int partitions)
throws PulsarClientException, PulsarAdminException {
admin.topics().createPartitionedTopic(topic, partitions);

@Cleanup
Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL)
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

@Cleanup
Expand All @@ -148,14 +165,16 @@ public void nullValueBooleanSchemaTest() throws PulsarClientException {

}

@Test
public void keyValueNullInlineTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/kv-null-value-test";
@Test(dataProvider = "topics")
public void keyValueNullInlineTest(String topic, int partitions)
throws PulsarClientException, PulsarAdminException {
admin.topics().createPartitionedTopic(topic, partitions);

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

@Cleanup
Expand Down Expand Up @@ -193,14 +212,23 @@ public void keyValueNullInlineTest() throws PulsarClientException {

}

@Test
public void keyValueNullSeparatedTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/kv-null-value-test";
@Test(dataProvider = "topics")
public void keyValueNullSeparatedTest(String topic, int partitions)
throws PulsarClientException, PulsarAdminException {
admin.topics().createPartitionedTopic(topic, partitions);

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
// The default SinglePartition routing mode will be affected by the key when the KeyValueEncodingType is
// SEPARATED so we need to define a message router to guarantee the message order.
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
})
.create();

@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final ConsumerInterceptors<T> interceptors;
protected final BatchReceivePolicy batchReceivePolicy;
protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
protected static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
.newUpdater(ConsumerBase.class, "incomingMessagesSize");
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
Expand Down Expand Up @@ -847,6 +847,15 @@ protected boolean hasPendingBatchReceive() {
return pendingBatchReceives != null && peekNextBatchReceive() != null;
}

protected void resetIncomingMessageSize() {
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
}

protected void updateIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
(message.getData() != null) ? message.getData().length : 0);
}
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved

protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);

private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize
private BatchMessageIdImpl clearReceiverQueue() {
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();

if (duringSeek.compareAndSet(true, false)) {
return seekMessageId;
Expand Down Expand Up @@ -1522,7 +1522,7 @@ protected synchronized void messageProcessed(Message<?> msg) {
stats.updateNumMsgsReceived(msg);

trackMessage(msg);
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, msg.getData() == null ? 0 : -msg.getData().length);
updateIncomingMessageSize(msg);
}

protected void trackMessage(Message<?> msg) {
Expand Down Expand Up @@ -1732,7 +1732,7 @@ public void redeliverUnacknowledgedMessages() {
synchronized (this) {
currentSize = incomingMessages.size();
incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();
unAckedMessageTracker.clear();
}
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
Expand All @@ -1756,7 +1756,7 @@ public void redeliverUnacknowledgedMessages() {
public int clearIncomingMessagesAndGetMessageNumber() {
int messagesNumber = incomingMessages.size();
incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();
unAckedMessageTracker.clear();
return messagesNumber;
}
Expand Down Expand Up @@ -1910,7 +1910,7 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
lastDequeuedMessageId = MessageId.earliest;

incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
Expand Down Expand Up @@ -1971,7 +1971,7 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
lastDequeuedMessageId = MessageId.earliest;

incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
Expand Down Expand Up @@ -2216,7 +2216,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
// try not to remove elements that are added while we remove
Message<T> message = incomingMessages.poll();
while (message != null) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
updateIncomingMessageSize(message);
messagesFromQueue++;
MessageIdImpl id = getMessageIdImpl(message);
if (!messageIds.contains(id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
@Override
protected synchronized void messageProcessed(Message<?> msg) {
unAckedMessageTracker.add(msg.getMessageId());
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
updateIncomingMessageSize(msg);
}

private void resumeReceivingFromPausedConsumersIfNeeded() {
Expand All @@ -331,7 +331,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
message = incomingMessages.take();
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
updateIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
Expand All @@ -347,7 +347,7 @@ protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarCl
try {
message = incomingMessages.poll(timeout, unit);
if (message != null) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
updateIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
}
Expand Down Expand Up @@ -388,7 +388,7 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Message<T> msg = incomingMessages.poll();
if (msg != null) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
updateIncomingMessageSize(msg);
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
Expand Down Expand Up @@ -416,7 +416,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
updateIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
Expand Down Expand Up @@ -622,7 +622,7 @@ public void redeliverUnacknowledgedMessages() {
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();
unAckedMessageTracker.clear();
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -691,7 +691,7 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {

unAckedMessageTracker.clear();
incomingMessages.clear();
MultiTopicsConsumerImpl.INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();

FutureUtil.waitForAll(futures).whenComplete((result, exception) -> {
if (exception != null) {
Expand Down Expand Up @@ -781,7 +781,7 @@ private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
Message<T> message = incomingMessages.poll();
checkState(message instanceof TopicMessageImpl);
while (message != null) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
updateIncomingMessageSize(message);
MessageId messageId = message.getMessageId();
if (!messageIds.contains(messageId)) {
messageIds.add(messageId);
Expand Down