Skip to content

Commit

Permalink
Check epoch message before offer queue
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Oct 8, 2022
1 parent 27c9dc7 commit 5738b01
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
protected final Lock reentrantLock = new ReentrantLock();
protected final Object incomingQueueLock = new Object();

protected static final AtomicLongFieldUpdater<ConsumerBase> CONSUMER_EPOCH =
AtomicLongFieldUpdater.newUpdater(ConsumerBase.class, "consumerEpoch");
Expand Down Expand Up @@ -839,12 +840,16 @@ protected boolean canEnqueueMessage(Message<T> message) {

protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
int messageSize = message.size();
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
// anymore, since for pooled messages, this instance was possibly already been released and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
// synchronize redeliverUnacknowledgedMessages()
synchronized (incomingQueueLock) {
if (isValidConsumerEpoch(message) && canEnqueueMessage(message) && incomingMessages.offer(message)) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message
// instance anymore, since for pooled messages, this instance was possibly already been released
// and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
}
}
return hasEnoughMessagesForBatchReceive();
}
Expand Down Expand Up @@ -948,10 +953,6 @@ protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messag
Message<T> msg = incomingMessages.poll();
if (msg != null) {
messageProcessed(msg);
if (!isValidConsumerEpoch(msg)) {
msgPeeked = incomingMessages.peek();
continue;
}
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,6 @@ protected Message<T> internalReceive() throws PulsarClientException {
}
message = incomingMessages.take();
messageProcessed(message);
if (!isValidConsumerEpoch(message)) {
return internalReceive();
}
return beforeConsume(message);
} catch (InterruptedException e) {
stats.incrementNumReceiveFailed();
Expand All @@ -479,11 +476,6 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
messageProcessed(message);
if (!isValidConsumerEpoch(message)) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
return;
}
result.complete(beforeConsume(message));
}
});
Expand All @@ -494,7 +486,6 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
@Override
protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarClientException {
Message<T> message;
long callTime = System.nanoTime();
try {
if (incomingMessages.isEmpty()) {
expectMoreIncomingMessages();
Expand All @@ -504,15 +495,6 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
return null;
}
messageProcessed(message);
if (!isValidConsumerEpoch(message)) {
long executionTime = System.nanoTime() - callTime;
long timeoutInNanos = unit.toNanos(timeout);
if (executionTime >= timeoutInNanos) {
return null;
} else {
return internalReceive(timeoutInNanos - executionTime, TimeUnit.NANOSECONDS);
}
}
return beforeConsume(message);
} catch (InterruptedException e) {
State state = getState();
Expand Down Expand Up @@ -1898,18 +1880,22 @@ public void redeliverUnacknowledgedMessages() {
return;
}

// clear local message
int currentSize = 0;
currentSize = incomingMessages.size();
clearIncomingMessages();
unAckedMessageTracker.clear();
int currentSize;
synchronized (incomingQueueLock) {
// we should increase epoch every time, because MultiTopicsConsumerImpl also increase it,
// we need to keep both epochs the same
if (conf.getSubscriptionType() == SubscriptionType.Failover
|| conf.getSubscriptionType() == SubscriptionType.Exclusive) {
CONSUMER_EPOCH.incrementAndGet(this);
}

// clear local message
currentSize = incomingMessages.size();
clearIncomingMessages();
unAckedMessageTracker.clear();

// we should increase epoch every time, because MultiTopicsConsumerImpl also increase it,
// we need to keep both epochs the same
if (conf.getSubscriptionType() == SubscriptionType.Failover
|| conf.getSubscriptionType() == SubscriptionType.Exclusive) {
CONSUMER_EPOCH.incrementAndGet(this);
}

// is channel is connected, we should send redeliver command to broker
if (cnx != null && isConnected(cnx)) {
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
Expand All @@ -1928,13 +1914,6 @@ public void redeliverUnacknowledgedMessages() {
}
}

public int clearIncomingMessagesAndGetMessageNumber() {
int messagesNumber = incomingMessages.size();
clearIncomingMessages();
unAckedMessageTracker.clear();
return messagesNumber;
}

@Override
public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
if (messageIds.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,6 @@ protected Message<T> internalReceive() throws PulsarClientException {
message = incomingMessages.take();
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
if (!isValidConsumerEpoch(message)) {
resumeReceivingFromPausedConsumersIfNeeded();
message.release();
return internalReceive();
}
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
return message;
Expand All @@ -380,8 +375,6 @@ protected Message<T> internalReceive() throws PulsarClientException {
@Override
protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarClientException {
Message<T> message;

long callTime = System.nanoTime();
try {
if (incomingMessages.isEmpty()) {
expectMoreIncomingMessages();
Expand All @@ -390,16 +383,6 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
if (message != null) {
decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
if (!isValidConsumerEpoch(message)) {
long executionTime = System.nanoTime() - callTime;
long timeoutInNanos = unit.toNanos(timeout);
if (executionTime >= timeoutInNanos) {
return null;
} else {
resumeReceivingFromPausedConsumersIfNeeded();
return internalReceive(timeoutInNanos - executionTime, TimeUnit.NANOSECONDS);
}
}
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
}
resumeReceivingFromPausedConsumersIfNeeded();
Expand Down Expand Up @@ -683,13 +666,15 @@ private ConsumerConfigurationData<T> getInternalConsumerConfig() {
@Override
public void redeliverUnacknowledgedMessages() {
internalPinnedExecutor.execute(() -> {
CONSUMER_EPOCH.incrementAndGet(this);
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
clearIncomingMessages();
unAckedMessageTracker.clear();
synchronized (incomingQueueLock) {
CONSUMER_EPOCH.incrementAndGet(this);
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
clearIncomingMessages();
unAckedMessageTracker.clear();
}
});
resumeReceivingFromPausedConsumersIfNeeded();
}
Expand Down

0 comments on commit 5738b01

Please sign in to comment.