Skip to content

Commit

Permalink
Only work lock when subType != shared
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Oct 8, 2022
1 parent 5738b01 commit f2d6b47
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.NoOpLock;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand Down Expand Up @@ -96,7 +97,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
protected final Lock reentrantLock = new ReentrantLock();
protected final Object incomingQueueLock = new Object();

// Only work when subscription type is Failover or Exclusive
protected final Lock incomingQueueLock;

protected static final AtomicLongFieldUpdater<ConsumerBase> CONSUMER_EPOCH =
AtomicLongFieldUpdater.newUpdater(ConsumerBase.class, "consumerEpoch");
Expand Down Expand Up @@ -163,6 +166,11 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
} else {
this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
}
if (getSubType() == CommandSubscribe.SubType.Failover || getSubType() == CommandSubscribe.SubType.Exclusive) {
incomingQueueLock = new ReentrantLock();
} else {
incomingQueueLock = new NoOpLock();
}

initReceiverQueueSize();
}
Expand Down Expand Up @@ -840,8 +848,9 @@ protected boolean canEnqueueMessage(Message<T> message) {

protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
int messageSize = message.size();
// synchronize redeliverUnacknowledgedMessages()
synchronized (incomingQueueLock) {
// synchronize redeliverUnacknowledgedMessages().
incomingQueueLock.lock();
try {
if (isValidConsumerEpoch(message) && canEnqueueMessage(message) && incomingMessages.offer(message)) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message
// instance anymore, since for pooled messages, this instance was possibly already been released
Expand All @@ -850,6 +859,8 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
}
} finally {
incomingQueueLock.unlock();
}
return hasEnoughMessagesForBatchReceive();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1881,7 +1881,8 @@ public void redeliverUnacknowledgedMessages() {
}

int currentSize;
synchronized (incomingQueueLock) {
incomingQueueLock.lock();
try {
// we should increase epoch every time, because MultiTopicsConsumerImpl also increase it,
// we need to keep both epochs the same
if (conf.getSubscriptionType() == SubscriptionType.Failover
Expand All @@ -1893,7 +1894,8 @@ public void redeliverUnacknowledgedMessages() {
currentSize = incomingMessages.size();
clearIncomingMessages();
unAckedMessageTracker.clear();

} finally {
incomingQueueLock.unlock();
}

// is channel is connected, we should send redeliver command to broker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,14 +666,17 @@ private ConsumerConfigurationData<T> getInternalConsumerConfig() {
@Override
public void redeliverUnacknowledgedMessages() {
internalPinnedExecutor.execute(() -> {
synchronized (incomingQueueLock) {
incomingQueueLock.lock();
try {
CONSUMER_EPOCH.incrementAndGet(this);
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
clearIncomingMessages();
unAckedMessageTracker.clear();
} finally {
incomingQueueLock.unlock();
}
});
resumeReceivingFromPausedConsumersIfNeeded();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.util;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public final class NoOpLock implements Lock {
public static final NoOpLock INSTANCE = new NoOpLock();

public void lock() {
}

public void lockInterruptibly() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
}

public boolean tryLock() {
return true;
}

public boolean tryLock(long l, TimeUnit tu) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
} else {
return true;
}
}

public void unlock() {
}

public Condition newCondition() {
throw new UnsupportedOperationException();
}
}

0 comments on commit f2d6b47

Please sign in to comment.