Skip to content

Commit

Permalink
fix lock issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Nov 19, 2022
1 parent f37ad7f commit 6228045
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,9 @@ protected boolean canEnqueueMessage(Message<T> message) {

protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
int messageSize = message.size();
synchronized (incomingQueueLock) {
// synchronize redeliverUnacknowledgedMessages().
incomingQueueLock.lock();
try {
if (isValidConsumerEpoch((MessageImpl<T>) message) && canEnqueueMessage(message)
&& incomingMessages.offer(message)) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message
Expand All @@ -857,6 +859,8 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
}
} finally {
incomingQueueLock.unlock();
}
return hasEnoughMessagesForBatchReceive();
}
Expand Down

0 comments on commit 6228045

Please sign in to comment.