From 99e667474b52604093acf08b03059f4091235eff Mon Sep 17 00:00:00 2001 From: mukesh-ctds Date: Tue, 16 Apr 2024 19:43:23 +0530 Subject: [PATCH] Revert "[fix] [ml] Reader can set read-pos to a deleted ledger (#21248)" This reverts commit dd28bb41f44de7d88aec39c79cc8014d64ad4476. --- .../mledger/impl/ManagedCursorImpl.java | 3 - .../mledger/impl/ManagedLedgerImpl.java | 21 +- .../mledger/impl/NonDurableCursorImpl.java | 2 +- .../api/NonDurableSubscriptionTest.java | 327 ------------------ 4 files changed, 8 insertions(+), 345 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index ccea125ef83f6..b910fe9f3cb1a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1121,9 +1121,6 @@ public long getNumberOfEntriesInBacklog(boolean isPrecise) { messagesConsumedCounter, markDeletePosition, readPosition); } if (isPrecise) { - if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) { - return 0; - } return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 32ac345629d32..9dcf398037cee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3694,30 +3694,23 @@ public PositionImpl getPreviousPosition(PositionImpl position) { * @return true if the position is valid, false otherwise */ public boolean isValidPosition(PositionImpl position) { - PositionImpl lac = lastConfirmedEntry; + PositionImpl last = lastConfirmedEntry; if (log.isDebugEnabled()) { - log.debug("IsValid position: {} -- last: {}", position, lac); + log.debug("IsValid position: {} -- last: {}", position, last); } - if (!ledgers.containsKey(position.getLedgerId())){ + if (position.getEntryId() < 0) { return false; - } else if (position.getEntryId() < 0) { + } else if (position.getLedgerId() > last.getLedgerId()) { return false; - } else if (currentLedger != null && position.getLedgerId() == currentLedger.getId()) { - // If current ledger is empty, the largest read position can be "{current_ledger: 0}". - // Else, the read position can be set to "{LAC + 1}" when subscribe at LATEST, - return (position.getLedgerId() == lac.getLedgerId() && position.getEntryId() <= lac.getEntryId() + 1) - || position.getEntryId() == 0; - } else if (position.getLedgerId() == lac.getLedgerId()) { - // The ledger witch maintains LAC was closed, and there is an empty current ledger. - // If entry id is larger than LAC, it should be "{current_ledger: 0}". - return position.getEntryId() <= lac.getEntryId(); + } else if (position.getLedgerId() == last.getLedgerId()) { + return position.getEntryId() <= (last.getEntryId() + 1); } else { // Look in the ledgers map LedgerInfo ls = ledgers.get(position.getLedgerId()); if (ls == null) { - if (position.getLedgerId() < lac.getLedgerId()) { + if (position.getLedgerId() < last.getLedgerId()) { // Pointing to a non-existing ledger that is older than the current ledger is invalid return false; } else { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 77216ce2e4588..51e56158cad55 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -70,7 +70,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl { private void recoverCursor(PositionImpl mdPosition) { Pair lastEntryAndCounter = ledger.getLastPositionAndCounter(); this.readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition); - markDeletePosition = ledger.getPreviousPosition(this.readPosition); + markDeletePosition = mdPosition; // Initialize the counter such that the difference between the messages written on the ML and the // messagesConsumed is equal to the current backlog (negated). diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java index 08b0777b640fc..522d494e0fb6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java @@ -24,8 +24,6 @@ import static org.testng.Assert.assertNotNull; import static org.testng.AssertJUnit.assertTrue; import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -33,8 +31,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; -import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; @@ -44,8 +40,6 @@ import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.proto.CommandFlow; -import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; -import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; @@ -347,325 +341,4 @@ public void testHasMessageAvailableIfIncomingQueueNotEmpty() throws Exception { producer.close(); admin.topics().delete(topicName); } - - @Test - public void testInitReaderAtSpecifiedPosition() throws Exception { - String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - admin.topics().createNonPartitionedTopic(topicName); - admin.topics().createSubscription(topicName, "s0", MessageId.earliest); - - // Trigger 5 ledgers. - ArrayList ledgers = new ArrayList<>(); - Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); - for (int i = 0; i < 5; i++) { - MessageIdImpl msgId = (MessageIdImpl) producer.send("1"); - ledgers.add(msgId.getLedgerId()); - admin.topics().unload(topicName); - } - producer.close(); - PersistentTopic persistentTopic = - (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); - ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); - LedgerHandle currentLedger = WhiteboxImpl.getInternalState(ml, "currentLedger"); - log.info("currentLedger: {}", currentLedger.getId()); - - // Less than the first ledger, and entry id is "-1". - log.info("start test s1"); - String s1 = "s1"; - MessageIdImpl startMessageId1 = new MessageIdImpl(ledgers.get(0) - 1, -1, -1); - Reader reader1 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s1) - .receiverQueueSize(0).startMessageId(startMessageId1).create(); - ManagedLedgerInternalStats.CursorStats cursor1 = admin.topics().getInternalStats(topicName).cursors.get(s1); - log.info("cursor1 readPosition: {}, markDeletedPosition: {}", cursor1.readPosition, cursor1.markDeletePosition); - PositionImpl p1 = parseReadPosition(cursor1); - assertEquals(p1.getLedgerId(), ledgers.get(0)); - assertEquals(p1.getEntryId(), 0); - reader1.close(); - - // Less than the first ledger, and entry id is Long.MAX_VALUE. - log.info("start test s2"); - String s2 = "s2"; - MessageIdImpl startMessageId2 = new MessageIdImpl(ledgers.get(0) - 1, Long.MAX_VALUE, -1); - Reader reader2 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s2) - .receiverQueueSize(0).startMessageId(startMessageId2).create(); - ManagedLedgerInternalStats.CursorStats cursor2 = admin.topics().getInternalStats(topicName).cursors.get(s2); - log.info("cursor2 readPosition: {}, markDeletedPosition: {}", cursor2.readPosition, cursor2.markDeletePosition); - PositionImpl p2 = parseReadPosition(cursor2); - assertEquals(p2.getLedgerId(), ledgers.get(0)); - assertEquals(p2.getEntryId(), 0); - reader2.close(); - - // Larger than the latest ledger, and entry id is "-1". - log.info("start test s3"); - String s3 = "s3"; - MessageIdImpl startMessageId3 = new MessageIdImpl(currentLedger.getId() + 1, -1, -1); - Reader reader3 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s3) - .receiverQueueSize(0).startMessageId(startMessageId3).create(); - ManagedLedgerInternalStats.CursorStats cursor3 = admin.topics().getInternalStats(topicName).cursors.get(s3); - log.info("cursor3 readPosition: {}, markDeletedPosition: {}", cursor3.readPosition, cursor3.markDeletePosition); - PositionImpl p3 = parseReadPosition(cursor3); - assertEquals(p3.getLedgerId(), currentLedger.getId()); - assertEquals(p3.getEntryId(), 0); - reader3.close(); - - // Larger than the latest ledger, and entry id is Long.MAX_VALUE. - log.info("start test s4"); - String s4 = "s4"; - MessageIdImpl startMessageId4 = new MessageIdImpl(currentLedger.getId() + 1, Long.MAX_VALUE, -1); - Reader reader4 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s4) - .receiverQueueSize(0).startMessageId(startMessageId4).create(); - ManagedLedgerInternalStats.CursorStats cursor4 = admin.topics().getInternalStats(topicName).cursors.get(s4); - log.info("cursor4 readPosition: {}, markDeletedPosition: {}", cursor4.readPosition, cursor4.markDeletePosition); - PositionImpl p4 = parseReadPosition(cursor4); - assertEquals(p4.getLedgerId(), currentLedger.getId()); - assertEquals(p4.getEntryId(), 0); - reader4.close(); - - // Ledger id and entry id both are Long.MAX_VALUE. - log.info("start test s5"); - String s5 = "s5"; - MessageIdImpl startMessageId5 = new MessageIdImpl(currentLedger.getId() + 1, Long.MAX_VALUE, -1); - Reader reader5 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s5) - .receiverQueueSize(0).startMessageId(startMessageId5).create(); - ManagedLedgerInternalStats.CursorStats cursor5 = admin.topics().getInternalStats(topicName).cursors.get(s5); - log.info("cursor5 readPosition: {}, markDeletedPosition: {}", cursor5.readPosition, cursor5.markDeletePosition); - PositionImpl p5 = parseReadPosition(cursor5); - assertEquals(p5.getLedgerId(), currentLedger.getId()); - assertEquals(p5.getEntryId(), 0); - reader5.close(); - - // Ledger id equals LAC, and entry id is "-1". - log.info("start test s6"); - String s6 = "s6"; - MessageIdImpl startMessageId6 = new MessageIdImpl(ledgers.get(ledgers.size() - 1), -1, -1); - Reader reader6 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s6) - .receiverQueueSize(0).startMessageId(startMessageId6).create(); - ManagedLedgerInternalStats.CursorStats cursor6 = admin.topics().getInternalStats(topicName).cursors.get(s6); - log.info("cursor6 readPosition: {}, markDeletedPosition: {}", cursor6.readPosition, cursor6.markDeletePosition); - PositionImpl p6 = parseReadPosition(cursor6); - assertEquals(p6.getLedgerId(), ledgers.get(ledgers.size() - 1)); - assertEquals(p6.getEntryId(), 0); - reader6.close(); - - // Larger than the latest ledger, and entry id is Long.MAX_VALUE. - log.info("start test s7"); - String s7 = "s7"; - MessageIdImpl startMessageId7 = new MessageIdImpl(ledgers.get(ledgers.size() - 1), Long.MAX_VALUE, -1); - Reader reader7 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s7) - .receiverQueueSize(0).startMessageId(startMessageId7).create(); - ManagedLedgerInternalStats.CursorStats cursor7 = admin.topics().getInternalStats(topicName).cursors.get(s7); - log.info("cursor7 readPosition: {}, markDeletedPosition: {}", cursor7.readPosition, cursor7.markDeletePosition); - PositionImpl p7 = parseReadPosition(cursor7); - assertEquals(p7.getLedgerId(), currentLedger.getId()); - assertEquals(p7.getEntryId(), 0); - reader7.close(); - - // A middle ledger id, and entry id is "-1". - log.info("start test s8"); - String s8 = "s8"; - MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), 0, -1); - Reader reader8 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s8) - .receiverQueueSize(0).startMessageId(startMessageId8).create(); - ManagedLedgerInternalStats.CursorStats cursor8 = admin.topics().getInternalStats(topicName).cursors.get(s8); - log.info("cursor8 readPosition: {}, markDeletedPosition: {}", cursor8.readPosition, cursor8.markDeletePosition); - PositionImpl p8 = parseReadPosition(cursor8); - assertEquals(p8.getLedgerId(), ledgers.get(2)); - assertEquals(p8.getEntryId(), 0); - reader8.close(); - - // Larger than the latest ledger, and entry id is Long.MAX_VALUE. - log.info("start test s9"); - String s9 = "s9"; - MessageIdImpl startMessageId9 = new MessageIdImpl(ledgers.get(2), Long.MAX_VALUE, -1); - Reader reader9 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s9) - .receiverQueueSize(0).startMessageId(startMessageId9).create(); - ManagedLedgerInternalStats.CursorStats cursor9 = admin.topics().getInternalStats(topicName).cursors.get(s9); - log.info("cursor9 readPosition: {}, markDeletedPosition: {}", cursor9.readPosition, - cursor9.markDeletePosition); - PositionImpl p9 = parseReadPosition(cursor9); - assertEquals(p9.getLedgerId(), ledgers.get(3)); - assertEquals(p9.getEntryId(), 0); - reader9.close(); - - // Larger than the latest ledger, and entry id equals with the max entry id of this ledger. - log.info("start test s10"); - String s10 = "s10"; - MessageIdImpl startMessageId10 = new MessageIdImpl(ledgers.get(2), 0, -1); - Reader reader10 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s10) - .receiverQueueSize(0).startMessageId(startMessageId10).create(); - ManagedLedgerInternalStats.CursorStats cursor10 = admin.topics().getInternalStats(topicName).cursors.get(s10); - log.info("cursor10 readPosition: {}, markDeletedPosition: {}", cursor10.readPosition, cursor10.markDeletePosition); - PositionImpl p10 = parseReadPosition(cursor10); - assertEquals(p10.getLedgerId(), ledgers.get(2)); - assertEquals(p10.getEntryId(), 0); - reader10.close(); - - // cleanup - admin.topics().delete(topicName, false); - } - - private PositionImpl parseReadPosition(ManagedLedgerInternalStats.CursorStats cursorStats) { - String[] ledgerIdAndEntryId = cursorStats.readPosition.split(":"); - return PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1])); - } - - @Test - public void testReaderInitAtDeletedPosition() throws Exception { - String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - admin.topics().createNonPartitionedTopic(topicName); - Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); - producer.send("1"); - producer.send("2"); - producer.send("3"); - MessageIdImpl msgIdInDeletedLedger4 = (MessageIdImpl) producer.send("4"); - MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) producer.send("5"); - - // Trigger a trim ledgers task, and verify trim ledgers successful. - admin.topics().unload(topicName); - trimLedgers(topicName); - List ledgers = admin.topics().getInternalStats(topicName).ledgers; - assertEquals(ledgers.size(), 1); - assertNotEquals(ledgers.get(0).ledgerId, msgIdInDeletedLedger5.getLedgerId()); - - // Start a reader at a deleted ledger. - MessageIdImpl startMessageId = - new MessageIdImpl(msgIdInDeletedLedger4.getLedgerId(), msgIdInDeletedLedger4.getEntryId(), -1); - Reader reader = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName("s1") - .startMessageId(startMessageId).create(); - Message msg1 = reader.readNext(2, TimeUnit.SECONDS); - Assert.assertNull(msg1); - - // Verify backlog and markDeletePosition is correct. - Awaitility.await().untilAsserted(() -> { - SubscriptionStats subscriptionStats = admin.topics() - .getStats(topicName, true, true, true).getSubscriptions().get("s1"); - log.info("backlog size: {}", subscriptionStats.getMsgBacklog()); - assertEquals(subscriptionStats.getMsgBacklog(), 0); - ManagedLedgerInternalStats.CursorStats cursorStats = - admin.topics().getInternalStats(topicName).cursors.get("s1"); - String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":"); - PositionImpl actMarkDeletedPos = - PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1])); - PositionImpl expectedMarkDeletedPos = - PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId()); - log.info("Expected mark deleted position: {}", expectedMarkDeletedPos); - log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition); - assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0); - }); - - // cleanup. - reader.close(); - producer.close(); - admin.topics().delete(topicName, false); - } - - @Test - public void testTrimLedgerIfNoDurableCursor() throws Exception { - final String nonDurableCursor = "non-durable-cursor"; - final String durableCursor = "durable-cursor"; - final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - admin.topics().createNonPartitionedTopic(topicName); - Reader reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1) - .subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create(); - Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).receiverQueueSize(1) - .subscriptionName(durableCursor).subscribe(); - consumer.close(); - - Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); - producer.send("1"); - producer.send("2"); - producer.send("3"); - producer.send("4"); - MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) producer.send("5"); - - Message msg1 = reader.readNext(2, TimeUnit.SECONDS); - assertEquals(msg1.getValue(), "1"); - Message msg2 = reader.readNext(2, TimeUnit.SECONDS); - assertEquals(msg2.getValue(), "2"); - Message msg3 = reader.readNext(2, TimeUnit.SECONDS); - assertEquals(msg3.getValue(), "3"); - - // Unsubscribe durable cursor. - // Trigger a trim ledgers task, and verify trim ledgers successful. - admin.topics().unload(topicName); - Thread.sleep(3 * 1000); - admin.topics().deleteSubscription(topicName, durableCursor); - // Trim ledgers after release durable cursor. - trimLedgers(topicName); - List ledgers = admin.topics().getInternalStats(topicName).ledgers; - assertEquals(ledgers.size(), 1); - assertNotEquals(ledgers.get(0).ledgerId, msgIdInDeletedLedger5.getLedgerId()); - - // Verify backlog and markDeletePosition is correct. - Awaitility.await().untilAsserted(() -> { - SubscriptionStats subscriptionStats = admin.topics().getStats(topicName, true, true, true) - .getSubscriptions().get(nonDurableCursor); - log.info("backlog size: {}", subscriptionStats.getMsgBacklog()); - assertEquals(subscriptionStats.getMsgBacklog(), 0); - ManagedLedgerInternalStats.CursorStats cursorStats = - admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor); - String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":"); - PositionImpl actMarkDeletedPos = - PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1])); - PositionImpl expectedMarkDeletedPos = - PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId()); - log.info("Expected mark deleted position: {}", expectedMarkDeletedPos); - log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition); - Assert.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0); - }); - - // Clear the incoming queue of the reader for next test. - while (true) { - Message msg = reader.readNext(2, TimeUnit.SECONDS); - if (msg == null) { - break; - } - log.info("clear msg: {}", msg.getValue()); - } - - // The following tests are designed to verify the api "getNumberOfEntries" and "consumedEntries" still work - // after changes.See the code-description added with the PR https://github.com/apache/pulsar/pull/10667. - PersistentTopic persistentTopic = - (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); - ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); - ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(nonDurableCursor); - - // Verify "getNumberOfEntries" if there is no entries to consume. - assertEquals(0, cursor.getNumberOfEntries()); - assertEquals(0, ml.getNumberOfEntries()); - - // Verify "getNumberOfEntries" if there is 1 entry to consume. - producer.send("6"); - producer.send("7"); - Awaitility.await().untilAsserted(() -> { - assertEquals(2, ml.getNumberOfEntries()); - // Since there is one message has been pulled into the incoming queue of reader. There is only one messages - // waiting to cursor read. - assertEquals(1, cursor.getNumberOfEntries()); - }); - - // Verify "consumedEntries" is correct. - ManagedLedgerInternalStats.CursorStats cursorStats = - admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor); - // "messagesConsumedCounter" should be 0 after unload the topic. - // Note: "topic_internal_stat.cursor.messagesConsumedCounter" means how many messages were acked on this - // cursor. The similar one "topic_stats.lastConsumedTimestamp" means the last time of sending messages to - // the consumer. - assertEquals(0, cursorStats.messagesConsumedCounter); - Message msg6 = reader.readNext(2, TimeUnit.SECONDS); - assertEquals(msg6.getValue(), "6"); - Message msg7 = reader.readNext(2, TimeUnit.SECONDS); - assertEquals(msg7.getValue(), "7"); - Awaitility.await().untilAsserted(() -> { - // "messagesConsumedCounter" should be 2 after consumed 2 message. - ManagedLedgerInternalStats.CursorStats cStat = - admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor); - assertEquals(2, cStat.messagesConsumedCounter); - }); - - // cleanup. - reader.close(); - producer.close(); - admin.topics().delete(topicName, false); - } }