Skip to content

Commit

Permalink
Fix NPE when MultiTopicsConsumerImpl receives null value messages (#9113
Browse files Browse the repository at this point in the history
)

### Motivation

[#6379](#6379) introduced the feature to handle null value messages, but it only checks the null value in `ConsumerImpl` when `INCOMING_MESSAGES_SIZE_UPDATER` is updated. Therefore, if a partitioned topic with at least 2 partitions was consumed with a null value message, the NPE would be thrown.

### Modifications

- Check the null value message in `MultiTopicsConsumerImpl` as well as `ConsumerImpl`. To reduce repeated code, two protected methods are added to `ConsumerBase` and `INCOMING_MESSAGES_SIZE_UPDATER` becomes private now, the derived consumer classes just use these two methods to update or reset `INCOMING_MESSAGES_SIZE_UPDATER`.
- Add tests for partitioned topics in `NullValueTest`. Since the existed tests rely on the message send order, here we only send messages to a single partition only.
  • Loading branch information
BewareMyPower authored Jan 4, 2021
1 parent 73e0dbd commit dd3b9d8
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
Expand All @@ -52,13 +57,23 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void nullValueBytesSchemaTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/null-value-bytes-test";
@DataProvider(name = "topics")
public static Object[][] topics() {
return new Object[][]{
{"persistent://prop/ns-abc/null-value-test-0", 1},
{"persistent://prop/ns-abc/null-value-test-1", 3},
};
}

@Test(dataProvider = "topics")
public void nullValueBytesSchemaTest(String topic, int partitions)
throws PulsarClientException, PulsarAdminException {
admin.topics().createPartitionedTopic(topic, partitions);

@Cleanup
Producer producer = pulsarClient.newProducer()
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

@Cleanup
Expand Down Expand Up @@ -120,13 +135,15 @@ public void nullValueBytesSchemaTest() throws PulsarClientException {

}

@Test
public void nullValueBooleanSchemaTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/null-value-bool-test";
@Test(dataProvider = "topics")
public void nullValueBooleanSchemaTest(String topic, int partitions)
throws PulsarClientException, PulsarAdminException {
admin.topics().createPartitionedTopic(topic, partitions);

@Cleanup
Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL)
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

@Cleanup
Expand All @@ -148,14 +165,16 @@ public void nullValueBooleanSchemaTest() throws PulsarClientException {

}

@Test
public void keyValueNullInlineTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/kv-null-value-test";
@Test(dataProvider = "topics")
public void keyValueNullInlineTest(String topic, int partitions)
throws PulsarClientException, PulsarAdminException {
admin.topics().createPartitionedTopic(topic, partitions);

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

@Cleanup
Expand Down Expand Up @@ -193,14 +212,23 @@ public void keyValueNullInlineTest() throws PulsarClientException {

}

@Test
public void keyValueNullSeparatedTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/kv-null-value-test";
@Test(dataProvider = "topics")
public void keyValueNullSeparatedTest(String topic, int partitions)
throws PulsarClientException, PulsarAdminException {
admin.topics().createPartitionedTopic(topic, partitions);

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
// The default SinglePartition routing mode will be affected by the key when the KeyValueEncodingType is
// SEPARATED so we need to define a message router to guarantee the message order.
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
})
.create();

@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final ConsumerInterceptors<T> interceptors;
protected final BatchReceivePolicy batchReceivePolicy;
protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
protected static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
.newUpdater(ConsumerBase.class, "incomingMessagesSize");
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
Expand Down Expand Up @@ -847,6 +847,15 @@ protected boolean hasPendingBatchReceive() {
return pendingBatchReceives != null && peekNextBatchReceive() != null;
}

protected void resetIncomingMessageSize() {
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
}

protected void updateIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
(message.getData() != null) ? message.getData().length : 0);
}

protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);

private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize
private BatchMessageIdImpl clearReceiverQueue() {
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();

if (duringSeek.compareAndSet(true, false)) {
return seekMessageId;
Expand Down Expand Up @@ -1522,7 +1522,7 @@ protected synchronized void messageProcessed(Message<?> msg) {
stats.updateNumMsgsReceived(msg);

trackMessage(msg);
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, msg.getData() == null ? 0 : -msg.getData().length);
updateIncomingMessageSize(msg);
}

protected void trackMessage(Message<?> msg) {
Expand Down Expand Up @@ -1732,7 +1732,7 @@ public void redeliverUnacknowledgedMessages() {
synchronized (this) {
currentSize = incomingMessages.size();
incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();
unAckedMessageTracker.clear();
}
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
Expand All @@ -1756,7 +1756,7 @@ public void redeliverUnacknowledgedMessages() {
public int clearIncomingMessagesAndGetMessageNumber() {
int messagesNumber = incomingMessages.size();
incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();
unAckedMessageTracker.clear();
return messagesNumber;
}
Expand Down Expand Up @@ -1910,7 +1910,7 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
lastDequeuedMessageId = MessageId.earliest;

incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
Expand Down Expand Up @@ -1971,7 +1971,7 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
lastDequeuedMessageId = MessageId.earliest;

incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
Expand Down Expand Up @@ -2216,7 +2216,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
// try not to remove elements that are added while we remove
Message<T> message = incomingMessages.poll();
while (message != null) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
updateIncomingMessageSize(message);
messagesFromQueue++;
MessageIdImpl id = getMessageIdImpl(message);
if (!messageIds.contains(id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
@Override
protected synchronized void messageProcessed(Message<?> msg) {
unAckedMessageTracker.add(msg.getMessageId());
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
updateIncomingMessageSize(msg);
}

private void resumeReceivingFromPausedConsumersIfNeeded() {
Expand All @@ -331,7 +331,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
message = incomingMessages.take();
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
updateIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
Expand All @@ -347,7 +347,7 @@ protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarCl
try {
message = incomingMessages.poll(timeout, unit);
if (message != null) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
updateIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
}
Expand Down Expand Up @@ -388,7 +388,7 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Message<T> msg = incomingMessages.poll();
if (msg != null) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
updateIncomingMessageSize(msg);
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
Expand Down Expand Up @@ -416,7 +416,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
updateIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
Expand Down Expand Up @@ -622,7 +622,7 @@ public void redeliverUnacknowledgedMessages() {
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();
unAckedMessageTracker.clear();
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -691,7 +691,7 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {

unAckedMessageTracker.clear();
incomingMessages.clear();
MultiTopicsConsumerImpl.INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
resetIncomingMessageSize();

FutureUtil.waitForAll(futures).whenComplete((result, exception) -> {
if (exception != null) {
Expand Down Expand Up @@ -781,7 +781,7 @@ private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
Message<T> message = incomingMessages.poll();
checkState(message instanceof TopicMessageImpl);
while (message != null) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length);
updateIncomingMessageSize(message);
MessageId messageId = message.getMessageId();
if (!messageIds.contains(messageId)) {
messageIds.add(messageId);
Expand Down

0 comments on commit dd3b9d8

Please sign in to comment.