Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Nov 15, 2022
1 parent d237f50 commit 07cd25c
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,14 @@ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exce
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topic, 5);
final int messageNumber = 50;
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,10 +945,6 @@ protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messag
Message<T> msg = incomingMessages.poll();
if (msg != null) {
messageProcessed(msg);
if (!isValidConsumerEpoch((MessageImpl<T>) msg)) {
msgPeeked = incomingMessages.peek();
continue;
}
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,6 @@ protected Message<T> internalReceive() throws PulsarClientException {
}
}

boolean isValidConsumerEpoch(Message<T> message) {
return isValidConsumerEpoch((MessageImpl<T>) message);
}

@Override
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
Expand Down Expand Up @@ -1921,13 +1917,6 @@ public void redeliverUnacknowledgedMessages() {
}
}

public int clearIncomingMessagesAndGetMessageNumber() {
int messagesNumber = incomingMessages.size();
clearIncomingMessages();
unAckedMessageTracker.clear();
return messagesNumber;
}

@Override
public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
if (messageIds.isEmpty()) {
Expand Down

0 comments on commit 07cd25c

Please sign in to comment.