Skip to content

Commit

Permalink
continue cherry-pick UnAckedMessageRedeliveryTracker.java
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Nov 15, 2022
1 parent d237f50 commit 23fb813
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,14 @@ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exce
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topic, 5);
final int messageNumber = 50;
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,6 @@ protected Message<T> internalReceive() throws PulsarClientException {
}
}

boolean isValidConsumerEpoch(Message<T> message) {
return isValidConsumerEpoch((MessageImpl<T>) message);
}

@Override
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
Expand Down Expand Up @@ -1921,13 +1917,6 @@ public void redeliverUnacknowledgedMessages() {
}
}

public int clearIncomingMessagesAndGetMessageNumber() {
int messagesNumber = incomingMessages.size();
clearIncomingMessages();
unAckedMessageTracker.clear();
return messagesNumber;
}

@Override
public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
if (messageIds.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public UnAckedMessageRedeliveryTracker(PulsarClientImpl client, ConsumerBase<?>
@Override
public void run(Timeout t) throws Exception {
writeLock.lock();
Set<MessageId> messageIds = null;
try {
HashSet<UnackMessageIdWrapper> headPartition = redeliveryTimePartitions.removeFirst();
if (!headPartition.isEmpty()) {
Expand All @@ -71,9 +72,13 @@ public void run(Timeout t) throws Exception {
}
headPartition.clear();
redeliveryTimePartitions.addLast(headPartition);
triggerRedelivery(consumerBase);
messageIds = getRedeliveryMessages(consumerBase);
} finally {
writeLock.unlock();
if (messageIds != null && !messageIds.isEmpty()) {
consumerBase.onAckTimeoutSend(messageIds);
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
}
}
Expand All @@ -93,35 +98,29 @@ private void addAckTimeoutMessages(UnackMessageIdWrapper messageIdWrapper) {
}
}

private void triggerRedelivery(ConsumerBase<?> consumerBase) {
private Set<MessageId> getRedeliveryMessages(ConsumerBase<?> consumerBase) {
if (ackTimeoutMessages.isEmpty()) {
return;
return null;
}
Set<MessageId> messageIds = TL_MESSAGE_IDS_SET.get();
messageIds.clear();

try {
long now = System.currentTimeMillis();
ackTimeoutMessages.forEach((messageId, timestamp) -> {
if (timestamp <= now) {
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase);
messageIds.add(messageId);
}
});
if (!messageIds.isEmpty()) {
log.info("[{}] {} messages will be re-delivered", consumerBase, messageIds.size());
Iterator<MessageId> iterator = messageIds.iterator();
while (iterator.hasNext()) {
MessageId messageId = iterator.next();
ackTimeoutMessages.remove(messageId);
}
long now = System.currentTimeMillis();
ackTimeoutMessages.forEach((messageId, timestamp) -> {
if (timestamp <= now) {
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase);
messageIds.add(messageId);
}
} finally {
if (messageIds.size() > 0) {
consumerBase.onAckTimeoutSend(messageIds);
consumerBase.redeliverUnacknowledgedMessages(messageIds);
});
if (!messageIds.isEmpty()) {
log.info("[{}] {} messages will be re-delivered", consumerBase, messageIds.size());
Iterator<MessageId> iterator = messageIds.iterator();
while (iterator.hasNext()) {
MessageId messageId = iterator.next();
ackTimeoutMessages.remove(messageId);
}
}
return messageIds;
}

@Override
Expand Down

0 comments on commit 23fb813

Please sign in to comment.