Skip to content

Commit

Permalink
[refactor][client] Remove duplicated and unused code in ACK grouping …
Browse files Browse the repository at this point in the history
…tracker (#18392)


In addition, the `validationError` argument in
`newMessageAckCommandAndWrite` is never used.

### Modifications

Move the `isAckReceiptEnabled` into `ConsumerImpl` and add two methods `acquireReadLock`/`acquireWriteLock` to acquire the read or write lock if ACK receipt is enabled.

Remove the `validationError` argument.
  • Loading branch information
BewareMyPower authored Nov 16, 2022
1 parent 705390f commit 545f33f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2814,6 +2814,12 @@ public Map<MessageIdImpl, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopic
return possibleSendToDeadLetterTopicMessages;
}

boolean isAckReceiptEnabled() {
ClientCnx cnx = getClientCnx();
return conf.isAckReceiptEnabled() && cnx != null
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}

private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import javax.annotation.Nullable;
Expand All @@ -44,7 +46,6 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
Expand All @@ -66,8 +67,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments

private final long acknowledgementGroupTimeMicros;

private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
private volatile CompletableFuture<Void> currentIndividualAckFuture;
private volatile CompletableFuture<Void> currentCumulativeAckFuture;

private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();

Expand Down Expand Up @@ -125,7 +126,7 @@ public boolean isDuplicate(MessageId messageId) {
public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
AckType ackType, Map<String, Long> properties) {
if (AckType.Cumulative.equals(ackType)) {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
if (consumer.isAckReceiptEnabled()) {
Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
messageIds.forEach(messageId ->
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
Expand All @@ -135,30 +136,20 @@ public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
return CompletableFuture.completedFuture(null);
}
} else {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
if (messageIds.size() != 0) {
addListAcknowledgment(messageIds);
return this.currentIndividualAckFuture;
} else {
return CompletableFuture.completedFuture(null);
}
} finally {
this.lock.readLock().unlock();
if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= maxAckGroupSize) {
flush();
}
Optional<Lock> readLock = acquireReadLock();
try {
if (messageIds.size() != 0) {
addListAcknowledgment(messageIds);
return readLock.map(__ -> currentIndividualAckFuture)
.orElse(CompletableFuture.completedFuture(null));
} else {
return CompletableFuture.completedFuture(null);
}
} else {
addListAcknowledgment(messageIds);
} finally {
readLock.ifPresent(Lock::unlock);
if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= maxAckGroupSize) {
flush();
}
return CompletableFuture.completedFuture(null);
}
}
}
Expand Down Expand Up @@ -256,26 +247,15 @@ private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<Str
// uncommon condition since it's only used for the compaction subscription.
return doImmediateAck(messageId, AckType.Individual, properties, null);
} else {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
doIndividualAckAsync(messageId);
return this.currentIndividualAckFuture;
} finally {
this.lock.readLock().unlock();
if (pendingIndividualAcks.size() >= maxAckGroupSize) {
flush();
}
}
} else {
Optional<Lock> readLock = acquireReadLock();
try {
doIndividualAckAsync(messageId);
return readLock.map(__ -> currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null));
} finally {
readLock.ifPresent(Lock::unlock);
if (pendingIndividualAcks.size() >= maxAckGroupSize) {
flush();
}
return CompletableFuture.completedFuture(null);
}
}
}
Expand All @@ -298,26 +278,15 @@ private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMes
}

private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageId) {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
doIndividualBatchAckAsync(batchMessageId);
return this.currentIndividualAckFuture;
} finally {
this.lock.readLock().unlock();
if (pendingIndividualBatchIndexAcks.size() >= maxAckGroupSize) {
flush();
}
}
} else {
Optional<Lock> readLock = acquireReadLock();
try {
doIndividualBatchAckAsync(batchMessageId);
return readLock.map(__ -> currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null));
} finally {
readLock.ifPresent(Lock::unlock);
if (pendingIndividualBatchIndexAcks.size() >= maxAckGroupSize) {
flush();
}
return CompletableFuture.completedFuture(null);
}
}

Expand All @@ -329,20 +298,12 @@ private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId, Map<Str
// uncommon condition since it's only used for the compaction subscription.
return doImmediateAck(messageId, AckType.Cumulative, properties, bitSet);
} else {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
doCumulativeAckAsync(messageId, bitSet);
return this.currentCumulativeAckFuture;
} finally {
this.lock.readLock().unlock();
}
} else {
Optional<Lock> readLock = acquireReadLock();
try {
doCumulativeAckAsync(messageId, bitSet);
return CompletableFuture.completedFuture(null);
return readLock.map(__ -> currentCumulativeAckFuture).orElse(CompletableFuture.completedFuture(null));
} finally {
readLock.ifPresent(Lock::unlock);
}
}
}
Expand Down Expand Up @@ -415,7 +376,7 @@ private CompletableFuture<Void> doImmediateBatchIndexAck(BatchMessageIdImpl msgI
}

CompletableFuture<Void> completableFuture = newMessageAckCommandAndWrite(cnx, consumer.consumerId,
msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties, true, null, null);
msgId.ledgerId, msgId.entryId, bitSet, ackType, properties, true, null, null);
bitSet.recycle();
return completableFuture;
}
Expand All @@ -434,15 +395,11 @@ public void flush() {
return;
}

if (isAckReceiptEnabled(consumer.getClientCnx())) {
this.lock.writeLock().lock();
try {
flushAsync(cnx);
} finally {
this.lock.writeLock().unlock();
}
} else {
Optional<Lock> writeLock = acquireWriteLock();
try {
flushAsync(cnx);
} finally {
writeLock.ifPresent(Lock::unlock);
}
}

Expand All @@ -453,8 +410,9 @@ private void flushAsync(ClientCnx cnx) {
shouldFlush = true;
final MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId();
newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(),
lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, null,
Collections.emptyMap(), false, this.currentCumulativeAckFuture, null);
lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative,
Collections.emptyMap(), false,
(TimedCompletableFuture<Void>) this.currentCumulativeAckFuture, null);
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
}

Expand Down Expand Up @@ -493,7 +451,7 @@ private void flushAsync(ClientCnx cnx) {
break;
}
newMessageAckCommandAndWrite(cnx, consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(),
null, AckType.Individual, null, Collections.emptyMap(), false,
null, AckType.Individual, Collections.emptyMap(), false,
null, null);
shouldFlush = true;
}
Expand All @@ -514,8 +472,8 @@ private void flushAsync(ClientCnx cnx) {
if (entriesToAck.size() > 0) {

newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L,
null, AckType.Individual, null, null, true,
currentIndividualAckFuture, entriesToAck);
null, AckType.Individual, null, true,
(TimedCompletableFuture<Void>) currentIndividualAckFuture, entriesToAck);
shouldFlush = true;
}

Expand Down Expand Up @@ -560,31 +518,30 @@ private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, Message
}
}
completableFuture = newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L,
null, ackType, null, null, true, null, entriesToAck);
null, ackType, null, true, null, entriesToAck);
} else {
// if don't support multi message ack, it also support ack receipt, so we should not think about the
// ack receipt in this logic
for (MessageIdImpl cMsgId : chunkMsgIds) {
newMessageAckCommandAndWrite(cnx, consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(),
bitSet, ackType, null, map, true, null, null);
bitSet, ackType, map, true, null, null);
}
completableFuture = CompletableFuture.completedFuture(null);
}
} else {
completableFuture = newMessageAckCommandAndWrite(cnx, consumerId, msgId.ledgerId, msgId.getEntryId(),
bitSet, ackType, null, map, true, null, null);
bitSet, ackType, map, true, null, null);
}
return completableFuture;
}

private CompletableFuture<Void> newMessageAckCommandAndWrite(
ClientCnx cnx, long consumerId, long ledgerId,
long entryId, BitSetRecyclable ackSet, AckType ackType,
CommandAck.ValidationError validationError,
Map<String, Long> properties, boolean flush,
TimedCompletableFuture<Void> timedCompletableFuture,
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
if (consumer.isAckReceiptEnabled()) {
final long requestId = consumer.getClient().newRequestId();
final ByteBuf cmd;
if (entriesToAck == null) {
Expand Down Expand Up @@ -633,9 +590,16 @@ private CompletableFuture<Void> newMessageAckCommandAndWrite(
}
}

private boolean isAckReceiptEnabled(ClientCnx cnx) {
return ackReceiptEnabled && cnx != null
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
public Optional<Lock> acquireReadLock() {
Optional<Lock> optionalLock = Optional.ofNullable(consumer.isAckReceiptEnabled() ? lock.readLock() : null);
optionalLock.ifPresent(Lock::lock);
return optionalLock;
}

public Optional<Lock> acquireWriteLock() {
Optional<Lock> optionalLock = Optional.ofNullable(consumer.isAckReceiptEnabled() ? lock.writeLock() : null);
optionalLock.ifPresent(Lock::lock);
return optionalLock;
}
}

Expand Down

0 comments on commit 545f33f

Please sign in to comment.