Skip to content

Commit

Permalink
[fix] [ml] Reader can set read-pos to a deleted ledger (#21248)
Browse files Browse the repository at this point in the history
### Motivation

After trimming ledgers, the variable `lastConfirmedEntry` of the managed ledger might rely on a deleted ledger(the latest ledger which contains data).

There is a bug that makes pulsar allow users to set the start read position to an unexisting ledger or a deleted ledger when creating a subscription. This makes the `backlog` and `markDeletedPosition` wrong. 

### Modifications

Fix the bug.
  • Loading branch information
poorbarcode authored Oct 8, 2023
1 parent f85e0dc commit 4ee5cd7
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,9 @@ public long getNumberOfEntriesInBacklog(boolean isPrecise) {
messagesConsumedCounter, markDeletePosition, readPosition);
}
if (isPrecise) {
if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) {
return 0;
}
return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3630,23 +3630,30 @@ public PositionImpl getPreviousPosition(PositionImpl position) {
* @return true if the position is valid, false otherwise
*/
public boolean isValidPosition(PositionImpl position) {
PositionImpl last = lastConfirmedEntry;
PositionImpl lac = lastConfirmedEntry;
if (log.isDebugEnabled()) {
log.debug("IsValid position: {} -- last: {}", position, last);
log.debug("IsValid position: {} -- last: {}", position, lac);
}

if (position.getEntryId() < 0) {
if (!ledgers.containsKey(position.getLedgerId())){
return false;
} else if (position.getLedgerId() > last.getLedgerId()) {
} else if (position.getEntryId() < 0) {
return false;
} else if (position.getLedgerId() == last.getLedgerId()) {
return position.getEntryId() <= (last.getEntryId() + 1);
} else if (currentLedger != null && position.getLedgerId() == currentLedger.getId()) {
// If current ledger is empty, the largest read position can be "{current_ledger: 0}".
// Else, the read position can be set to "{LAC + 1}" when subscribe at LATEST,
return (position.getLedgerId() == lac.getLedgerId() && position.getEntryId() <= lac.getEntryId() + 1)
|| position.getEntryId() == 0;
} else if (position.getLedgerId() == lac.getLedgerId()) {
// The ledger witch maintains LAC was closed, and there is an empty current ledger.
// If entry id is larger than LAC, it should be "{current_ledger: 0}".
return position.getEntryId() <= lac.getEntryId();
} else {
// Look in the ledgers map
LedgerInfo ls = ledgers.get(position.getLedgerId());

if (ls == null) {
if (position.getLedgerId() < last.getLedgerId()) {
if (position.getLedgerId() < lac.getLedgerId()) {
// Pointing to a non-existing ledger that is older than the current ledger is invalid
return false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
private void recoverCursor(PositionImpl mdPosition) {
Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
this.readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition);
markDeletePosition = mdPosition;
markDeletePosition = ledger.getPreviousPosition(this.readPosition);

// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is equal to the current backlog (negated).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
Expand All @@ -40,6 +43,8 @@
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
Expand Down Expand Up @@ -341,4 +346,215 @@ public void testTrimLedgerIfNoDurableCursor() throws Exception {
producer.close();
admin.topics().delete(topicName);
}

@Test
public void testInitReaderAtSpecifiedPosition() throws Exception {
String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, "s0", MessageId.earliest);

// Trigger 5 ledgers.
ArrayList<Long> ledgers = new ArrayList<>();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
for (int i = 0; i < 5; i++) {
MessageIdImpl msgId = (MessageIdImpl) producer.send("1");
ledgers.add(msgId.getLedgerId());
admin.topics().unload(topicName);
}
producer.close();
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
LedgerHandle currentLedger = WhiteboxImpl.getInternalState(ml, "currentLedger");
log.info("currentLedger: {}", currentLedger.getId());

// Less than the first ledger, and entry id is "-1".
log.info("start test s1");
String s1 = "s1";
MessageIdImpl startMessageId1 = new MessageIdImpl(ledgers.get(0) - 1, -1, -1);
Reader<String> reader1 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s1)
.receiverQueueSize(0).startMessageId(startMessageId1).create();
ManagedLedgerInternalStats.CursorStats cursor1 = admin.topics().getInternalStats(topicName).cursors.get(s1);
log.info("cursor1 readPosition: {}, markDeletedPosition: {}", cursor1.readPosition, cursor1.markDeletePosition);
PositionImpl p1 = parseReadPosition(cursor1);
assertEquals(p1.getLedgerId(), ledgers.get(0));
assertEquals(p1.getEntryId(), 0);
reader1.close();

// Less than the first ledger, and entry id is Long.MAX_VALUE.
log.info("start test s2");
String s2 = "s2";
MessageIdImpl startMessageId2 = new MessageIdImpl(ledgers.get(0) - 1, Long.MAX_VALUE, -1);
Reader<String> reader2 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s2)
.receiverQueueSize(0).startMessageId(startMessageId2).create();
ManagedLedgerInternalStats.CursorStats cursor2 = admin.topics().getInternalStats(topicName).cursors.get(s2);
log.info("cursor2 readPosition: {}, markDeletedPosition: {}", cursor2.readPosition, cursor2.markDeletePosition);
PositionImpl p2 = parseReadPosition(cursor2);
assertEquals(p2.getLedgerId(), ledgers.get(0));
assertEquals(p2.getEntryId(), 0);
reader2.close();

// Larger than the latest ledger, and entry id is "-1".
log.info("start test s3");
String s3 = "s3";
MessageIdImpl startMessageId3 = new MessageIdImpl(currentLedger.getId() + 1, -1, -1);
Reader<String> reader3 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s3)
.receiverQueueSize(0).startMessageId(startMessageId3).create();
ManagedLedgerInternalStats.CursorStats cursor3 = admin.topics().getInternalStats(topicName).cursors.get(s3);
log.info("cursor3 readPosition: {}, markDeletedPosition: {}", cursor3.readPosition, cursor3.markDeletePosition);
PositionImpl p3 = parseReadPosition(cursor3);
assertEquals(p3.getLedgerId(), currentLedger.getId());
assertEquals(p3.getEntryId(), 0);
reader3.close();

// Larger than the latest ledger, and entry id is Long.MAX_VALUE.
log.info("start test s4");
String s4 = "s4";
MessageIdImpl startMessageId4 = new MessageIdImpl(currentLedger.getId() + 1, Long.MAX_VALUE, -1);
Reader<String> reader4 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s4)
.receiverQueueSize(0).startMessageId(startMessageId4).create();
ManagedLedgerInternalStats.CursorStats cursor4 = admin.topics().getInternalStats(topicName).cursors.get(s4);
log.info("cursor4 readPosition: {}, markDeletedPosition: {}", cursor4.readPosition, cursor4.markDeletePosition);
PositionImpl p4 = parseReadPosition(cursor4);
assertEquals(p4.getLedgerId(), currentLedger.getId());
assertEquals(p4.getEntryId(), 0);
reader4.close();

// Ledger id and entry id both are Long.MAX_VALUE.
log.info("start test s5");
String s5 = "s5";
MessageIdImpl startMessageId5 = new MessageIdImpl(currentLedger.getId() + 1, Long.MAX_VALUE, -1);
Reader<String> reader5 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s5)
.receiverQueueSize(0).startMessageId(startMessageId5).create();
ManagedLedgerInternalStats.CursorStats cursor5 = admin.topics().getInternalStats(topicName).cursors.get(s5);
log.info("cursor5 readPosition: {}, markDeletedPosition: {}", cursor5.readPosition, cursor5.markDeletePosition);
PositionImpl p5 = parseReadPosition(cursor5);
assertEquals(p5.getLedgerId(), currentLedger.getId());
assertEquals(p5.getEntryId(), 0);
reader5.close();

// Ledger id equals LAC, and entry id is "-1".
log.info("start test s6");
String s6 = "s6";
MessageIdImpl startMessageId6 = new MessageIdImpl(ledgers.get(ledgers.size() - 1), -1, -1);
Reader<String> reader6 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s6)
.receiverQueueSize(0).startMessageId(startMessageId6).create();
ManagedLedgerInternalStats.CursorStats cursor6 = admin.topics().getInternalStats(topicName).cursors.get(s6);
log.info("cursor6 readPosition: {}, markDeletedPosition: {}", cursor6.readPosition, cursor6.markDeletePosition);
PositionImpl p6 = parseReadPosition(cursor6);
assertEquals(p6.getLedgerId(), ledgers.get(ledgers.size() - 1));
assertEquals(p6.getEntryId(), 0);
reader6.close();

// Larger than the latest ledger, and entry id is Long.MAX_VALUE.
log.info("start test s7");
String s7 = "s7";
MessageIdImpl startMessageId7 = new MessageIdImpl(ledgers.get(ledgers.size() - 1), Long.MAX_VALUE, -1);
Reader<String> reader7 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s7)
.receiverQueueSize(0).startMessageId(startMessageId7).create();
ManagedLedgerInternalStats.CursorStats cursor7 = admin.topics().getInternalStats(topicName).cursors.get(s7);
log.info("cursor7 readPosition: {}, markDeletedPosition: {}", cursor7.readPosition, cursor7.markDeletePosition);
PositionImpl p7 = parseReadPosition(cursor7);
assertEquals(p7.getLedgerId(), currentLedger.getId());
assertEquals(p7.getEntryId(), 0);
reader7.close();

// A middle ledger id, and entry id is "-1".
log.info("start test s8");
String s8 = "s8";
MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), 0, -1);
Reader<String> reader8 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s8)
.receiverQueueSize(0).startMessageId(startMessageId8).create();
ManagedLedgerInternalStats.CursorStats cursor8 = admin.topics().getInternalStats(topicName).cursors.get(s8);
log.info("cursor8 readPosition: {}, markDeletedPosition: {}", cursor8.readPosition, cursor8.markDeletePosition);
PositionImpl p8 = parseReadPosition(cursor8);
assertEquals(p8.getLedgerId(), ledgers.get(2));
assertEquals(p8.getEntryId(), 0);
reader8.close();

// Larger than the latest ledger, and entry id is Long.MAX_VALUE.
log.info("start test s9");
String s9 = "s9";
MessageIdImpl startMessageId9 = new MessageIdImpl(ledgers.get(2), Long.MAX_VALUE, -1);
Reader<String> reader9 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s9)
.receiverQueueSize(0).startMessageId(startMessageId9).create();
ManagedLedgerInternalStats.CursorStats cursor9 = admin.topics().getInternalStats(topicName).cursors.get(s9);
log.info("cursor9 readPosition: {}, markDeletedPosition: {}", cursor9.readPosition,
cursor9.markDeletePosition);
PositionImpl p9 = parseReadPosition(cursor9);
assertEquals(p9.getLedgerId(), ledgers.get(3));
assertEquals(p9.getEntryId(), 0);
reader9.close();

// Larger than the latest ledger, and entry id equals with the max entry id of this ledger.
log.info("start test s10");
String s10 = "s10";
MessageIdImpl startMessageId10 = new MessageIdImpl(ledgers.get(2), 0, -1);
Reader<String> reader10 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s10)
.receiverQueueSize(0).startMessageId(startMessageId10).create();
ManagedLedgerInternalStats.CursorStats cursor10 = admin.topics().getInternalStats(topicName).cursors.get(s10);
log.info("cursor10 readPosition: {}, markDeletedPosition: {}", cursor10.readPosition, cursor10.markDeletePosition);
PositionImpl p10 = parseReadPosition(cursor10);
assertEquals(p10.getLedgerId(), ledgers.get(2));
assertEquals(p10.getEntryId(), 0);
reader10.close();

// cleanup
admin.topics().delete(topicName, false);
}

private PositionImpl parseReadPosition(ManagedLedgerInternalStats.CursorStats cursorStats) {
String[] ledgerIdAndEntryId = cursorStats.readPosition.split(":");
return PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
}

@Test
public void testReaderInitAtDeletedPosition() throws Exception {
String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(topicName);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
producer.send("1");
producer.send("2");
producer.send("3");
MessageIdImpl msgIdInDeletedLedger4 = (MessageIdImpl) producer.send("4");
MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) producer.send("5");

// Trigger a trim ledgers task, and verify trim ledgers successful.
admin.topics().unload(topicName);
trimLedgers(topicName);
List<ManagedLedgerInternalStats.LedgerInfo> ledgers = admin.topics().getInternalStats(topicName).ledgers;
assertEquals(ledgers.size(), 1);
assertNotEquals(ledgers.get(0).ledgerId, msgIdInDeletedLedger5.getLedgerId());

// Start a reader at a deleted ledger.
MessageIdImpl startMessageId =
new MessageIdImpl(msgIdInDeletedLedger4.getLedgerId(), msgIdInDeletedLedger4.getEntryId(), -1);
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName("s1")
.startMessageId(startMessageId).create();
Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
Assert.assertNull(msg1);

// Verify backlog and markDeletePosition is correct.
Awaitility.await().untilAsserted(() -> {
SubscriptionStats subscriptionStats = admin.topics()
.getStats(topicName, true, true, true).getSubscriptions().get("s1");
log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
assertEquals(subscriptionStats.getMsgBacklog(), 0);
ManagedLedgerInternalStats.CursorStats cursorStats =
admin.topics().getInternalStats(topicName).cursors.get("s1");
String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":");
PositionImpl actMarkDeletedPos =
PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
PositionImpl expectedMarkDeletedPos =
PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId());
log.info("Expected mark deleted position: {}", expectedMarkDeletedPos);
log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition);
assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
});

// cleanup.
reader.close();
producer.close();
admin.topics().delete(topicName, false);
}
}

0 comments on commit 4ee5cd7

Please sign in to comment.