diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index 41e451f9ed18f..7e3b68a5445be 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -30,9 +30,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import lombok.Cleanup; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +68,19 @@ public Object[][] ackReceiptEnabled() { return new Object[][] { { true }, { false } }; } + @DataProvider(name = "batchedMessageAck") + public Object[][] batchedMessageAck() { + // When batch index ack is disabled (by default), only after all single messages were sent would the pending + // ACK be added into the ACK tracker. + return new Object[][] { + // numAcked, batchSize, ack type + { 3, 5, CommandAck.AckType.Individual }, + { 5, 5, CommandAck.AckType.Individual }, + { 3, 5, CommandAck.AckType.Cumulative }, + { 5, 5, CommandAck.AckType.Cumulative } + }; + } + /** * It verifies that redelivered messages are sorted based on the ledger-ids. *
@@ -297,4 +313,57 @@ public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exce
         consumer.close();
         producer.close();
     }
+
+    @Test(timeOut = 30000, dataProvider = "batchedMessageAck")
+    public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackType) throws Exception {
+        String topic = "persistent://my-property/my-ns/test-ack-not-sent-"
+                + numAcked + "-" + batchSize + "-" + ackType.getValue();
+        @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .enableBatchIndexAcknowledgment(false)
+                .acknowledgmentGroupTime(1, TimeUnit.HOURS) // ACK won't be sent
+                .subscribe();
+        @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(true)
+                .batchingMaxMessages(batchSize)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .create();
+        for (int i = 0; i < batchSize; i++) {
+            String value = "msg-" + i;
+            producer.sendAsync(value).thenAccept(id -> log.info("{} was sent to {}", value, id));
+        }
+        List> messages = new ArrayList<>();
+        for (int i = 0; i < batchSize; i++) {
+            messages.add(consumer.receive());
+        }
+        if (ackType == CommandAck.AckType.Individual) {
+            for (int i = 0; i < numAcked; i++) {
+                consumer.acknowledge(messages.get(i));
+            }
+        } else {
+            consumer.acknowledgeCumulative(messages.get(numAcked - 1));
+        }
+
+        consumer.redeliverUnacknowledgedMessages();
+
+        messages.clear();
+        for (int i = 0; i < batchSize; i++) {
+            Message msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            log.info("Received {} from {}", msg.getValue(), msg.getMessageId());
+            messages.add(msg);
+        }
+        List values = messages.stream().map(Message::getValue).collect(Collectors.toList());
+        // All messages are redelivered because only if the whole batch are acknowledged would the message ID be
+        // added into the ACK tracker.
+        if (numAcked < batchSize) {
+            assertEquals(values, IntStream.range(0, batchSize).mapToObj(i -> "msg-" + i).collect(Collectors.toList()));
+        } else {
+            assertTrue(values.isEmpty());
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
index 5ef3054d16516..6fc8268ae2165 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
@@ -116,23 +116,10 @@ public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) th
         }
         producer.flush();
 
-        if (batchingEnabled) {
-            for (int i = 0; i < 30; i++) {
-                Message msg = consumer.receive();
-                assertEquals(msg.getValue(), "hello-" + i);
-                consumer.acknowledge(msg);
-            }
-            for (int i = 0; i < 30; i++) {
-                Message msg = consumer.receive();
-                assertEquals(msg.getValue(), "new-message-" + i);
-                consumer.acknowledge(msg);
-            }
-        } else {
-            for (int i = 0; i < 30; i++) {
-                Message msg = consumer.receive();
-                assertEquals(msg.getValue(), "new-message-" + i);
-                consumer.acknowledge(msg);
-            }
+        for (int i = 0; i < 30; i++) {
+            Message msg = consumer.receive();
+            assertEquals(msg.getValue(), "new-message-" + i);
+            consumer.acknowledge(msg);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index b44670c14631f..fef0bcb8906f1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -113,12 +113,18 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum
      */
     @Override
     public boolean isDuplicate(MessageId messageId) {
-        final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
-        if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
+        if (!(messageId instanceof MessageIdImpl)) {
+            throw new IllegalArgumentException("isDuplicated cannot accept "
+                    + messageId.getClass().getName() + ": " + messageId);
+        }
+        if (lastCumulativeAck.compareTo(messageId) >= 0) {
             // Already included in a cumulative ack
             return true;
         } else {
-            return pendingIndividualAcks.contains((MessageIdImpl) messageId);
+            final MessageIdImpl messageIdImpl = (messageId instanceof BatchMessageIdImpl)
+                    ? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
+                    : (MessageIdImpl) messageId;
+            return pendingIndividualAcks.contains(messageIdImpl);
         }
     }
 
@@ -622,7 +628,7 @@ protected LastCumulativeAck initialValue() {
     private boolean flushRequired = false;
 
     public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
-        if (messageId.compareTo(this.messageId) > 0) {
+        if (compareTo(messageId) < 0) {
             if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
                 this.bitSetRecyclable.recycle();
             }
@@ -656,6 +662,24 @@ public synchronized void reset() {
         flushRequired = false;
     }
 
+    public synchronized int compareTo(MessageId messageId) {
+        if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) {
+            final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
+            final MessageIdImpl rhs = (MessageIdImpl) messageId;
+            return MessageIdImpl.messageIdCompare(
+                    lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(),
+                    rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE);
+        } else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){
+            final MessageIdImpl lhs = this.messageId;
+            final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
+            return MessageIdImpl.messageIdCompare(
+                    lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE,
+                    rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex());
+        } else {
+            return this.messageId.compareTo(messageId);
+        }
+    }
+
     private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
         this.messageId = messageId;
         this.bitSetRecyclable = bitSetRecyclable;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
index 245866749e944..27ee9d1dc3648 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
@@ -83,4 +83,24 @@ public void testFlush() {
         assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
     }
 
+    @Test
+    public void testCompareTo() {
+        LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+        lastCumulativeAck.update(new MessageIdImpl(0L, 1L, -1), null);
+
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
+        assertEquals(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)), 0);
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
+        assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);
+
+        lastCumulativeAck = new LastCumulativeAck();
+        lastCumulativeAck.update(new BatchMessageIdImpl(0L, 1L, -1, 1), null);
+
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)) < 0);
+        assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
+        assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);
+        assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 2)) < 0);
+        assertEquals(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 1)), 0);
+    }
 }