Skip to content

Commit

Permalink
[fix] [ml] fix wrong msg backlog of non-durable cursor after trim led…
Browse files Browse the repository at this point in the history
…gers (apache#21250)

### Background
- But after trimming ledgers, `ml.lastConfirmedPosition` relies on a deleted ledger when the current ledger of ML is empty. 
- Cursor prevents setting `markDeletedPosition` to a value larger than `ml.lastConfirmedPosition`, but there are no entries to read<sup>[1]</sup>.
- The code description in the method `advanceCursors` said: do not make `cursor.markDeletedPosition` larger than `ml.lastConfirmedPosition`<sup>[2]</sup>

### Issue
If there is no durable cursor, the `markDeletedPosition` might be set to `{current_ledger, -1}`, and `async mark delete` will be prevented by the `rule-2` above. So he `backlog`, `readPosition`, and `markDeletedPosition` of the cursor will be in an incorrect position after trimming the ledger. You can reproduce it by the test `testTrimLedgerIfNoDurableCursor`

### Modifications
Do not make `cursor.markDeletedPosition` larger than `ml.lastConfirmedPosition` when advancing non-durable cursors.
  • Loading branch information
poorbarcode authored and vinayakmalik12 committed Oct 12, 2023
1 parent 4ad992f commit bdfeca1
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2850,15 +2850,14 @@ void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) throws LedgerNo
return;
}

// need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
// calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return
// incorrect results
Long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
if (firstNonDeletedLedger == null) {
throw new LedgerNotExistException("First non deleted Ledger is not found");
// Just ack messages like a consumer. Normally, consumers will not confirm a position that does not exist, so
// find the latest existing position to ack.
PositionImpl highestPositionToDelete = calculateLastEntryInLedgerList(ledgersToDelete);
if (highestPositionToDelete == null) {
log.warn("[{}] The ledgers to be trim are all empty, skip to advance non-durable cursors: {}",
name, ledgersToDelete);
return;
}
PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);

cursors.forEach(cursor -> {
// move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
// to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be
Expand All @@ -2882,6 +2881,19 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
});
}

/**
* @return null if all ledgers is empty.
*/
private PositionImpl calculateLastEntryInLedgerList(List<LedgerInfo> ledgersToDelete) {
for (int i = ledgersToDelete.size() - 1; i >= 0; i--) {
LedgerInfo ledgerInfo = ledgersToDelete.get(i);
if (ledgerInfo != null && ledgerInfo.hasEntries() && ledgerInfo.getEntries() > 0) {
return PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
}
}
return null;
}

/**
* Delete this ManagedLedger completely from the system.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -55,7 +56,7 @@

@Test(groups = "broker-api")
@Slf4j
public class NonDurableSubscriptionTest extends ProducerConsumerBase {
public class NonDurableSubscriptionTest extends ProducerConsumerBase {

private final AtomicInteger numFlow = new AtomicInteger(0);

Expand Down Expand Up @@ -316,7 +317,7 @@ private void switchLedgerManually(final String tpName) throws Exception {
}

@Test
public void testTrimLedgerIfNoDurableCursor() throws Exception {
public void testHasMessageAvailableIfIncomingQueueNotEmpty() throws Exception {
final String nonDurableCursor = "non-durable-cursor";
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
Expand Down Expand Up @@ -557,4 +558,114 @@ public void testReaderInitAtDeletedPosition() throws Exception {
producer.close();
admin.topics().delete(topicName, false);
}

@Test
public void testTrimLedgerIfNoDurableCursor() throws Exception {
final String nonDurableCursor = "non-durable-cursor";
final String durableCursor = "durable-cursor";
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(topicName);
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
.subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).receiverQueueSize(1)
.subscriptionName(durableCursor).subscribe();
consumer.close();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
producer.send("1");
producer.send("2");
producer.send("3");
producer.send("4");
MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) producer.send("5");

Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg1.getValue(), "1");
Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg2.getValue(), "2");
Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg3.getValue(), "3");

// Unsubscribe durable cursor.
// Trigger a trim ledgers task, and verify trim ledgers successful.
admin.topics().unload(topicName);
Thread.sleep(3 * 1000);
admin.topics().deleteSubscription(topicName, durableCursor);
// Trim ledgers after release durable cursor.
trimLedgers(topicName);
List<ManagedLedgerInternalStats.LedgerInfo> ledgers = admin.topics().getInternalStats(topicName).ledgers;
assertEquals(ledgers.size(), 1);
assertNotEquals(ledgers.get(0).ledgerId, msgIdInDeletedLedger5.getLedgerId());

// Verify backlog and markDeletePosition is correct.
Awaitility.await().untilAsserted(() -> {
SubscriptionStats subscriptionStats = admin.topics().getStats(topicName, true, true, true)
.getSubscriptions().get(nonDurableCursor);
log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
assertEquals(subscriptionStats.getMsgBacklog(), 0);
ManagedLedgerInternalStats.CursorStats cursorStats =
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":");
PositionImpl actMarkDeletedPos =
PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
PositionImpl expectedMarkDeletedPos =
PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId());
log.info("Expected mark deleted position: {}", expectedMarkDeletedPos);
log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition);
Assert.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
});

// Clear the incoming queue of the reader for next test.
while (true) {
Message<String> msg = reader.readNext(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
log.info("clear msg: {}", msg.getValue());
}

// The following tests are designed to verify the api "getNumberOfEntries" and "consumedEntries" still work
// after changes.See the code-description added with the PR https://github.com/apache/pulsar/pull/10667.
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(nonDurableCursor);

// Verify "getNumberOfEntries" if there is no entries to consume.
assertEquals(0, cursor.getNumberOfEntries());
assertEquals(0, ml.getNumberOfEntries());

// Verify "getNumberOfEntries" if there is 1 entry to consume.
producer.send("6");
producer.send("7");
Awaitility.await().untilAsserted(() -> {
assertEquals(2, ml.getNumberOfEntries());
// Since there is one message has been pulled into the incoming queue of reader. There is only one messages
// waiting to cursor read.
assertEquals(1, cursor.getNumberOfEntries());
});

// Verify "consumedEntries" is correct.
ManagedLedgerInternalStats.CursorStats cursorStats =
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
// "messagesConsumedCounter" should be 0 after unload the topic.
// Note: "topic_internal_stat.cursor.messagesConsumedCounter" means how many messages were acked on this
// cursor. The similar one "topic_stats.lastConsumedTimestamp" means the last time of sending messages to
// the consumer.
assertEquals(0, cursorStats.messagesConsumedCounter);
Message<String> msg6 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg6.getValue(), "6");
Message<String> msg7 = reader.readNext(2, TimeUnit.SECONDS);
assertEquals(msg7.getValue(), "7");
Awaitility.await().untilAsserted(() -> {
// "messagesConsumedCounter" should be 2 after consumed 2 message.
ManagedLedgerInternalStats.CursorStats cStat =
admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
assertEquals(2, cStat.messagesConsumedCounter);
});

// cleanup.
reader.close();
producer.close();
admin.topics().delete(topicName, false);
}
}

0 comments on commit bdfeca1

Please sign in to comment.