Skip to content

Commit

Permalink
Revert "[fix] [ml] Reader can set read-pos to a deleted ledger (apach…
Browse files Browse the repository at this point in the history
…e#21248)"

This reverts commit dd28bb4.
  • Loading branch information
mukesh-ctds committed Apr 16, 2024
1 parent 6070b4d commit 99e6674
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 345 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1121,9 +1121,6 @@ public long getNumberOfEntriesInBacklog(boolean isPrecise) {
messagesConsumedCounter, markDeletePosition, readPosition);
}
if (isPrecise) {
if (markDeletePosition.compareTo(ledger.getLastPosition()) >= 0) {
return 0;
}
return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3694,30 +3694,23 @@ public PositionImpl getPreviousPosition(PositionImpl position) {
* @return true if the position is valid, false otherwise
*/
public boolean isValidPosition(PositionImpl position) {
PositionImpl lac = lastConfirmedEntry;
PositionImpl last = lastConfirmedEntry;
if (log.isDebugEnabled()) {
log.debug("IsValid position: {} -- last: {}", position, lac);
log.debug("IsValid position: {} -- last: {}", position, last);
}

if (!ledgers.containsKey(position.getLedgerId())){
if (position.getEntryId() < 0) {
return false;
} else if (position.getEntryId() < 0) {
} else if (position.getLedgerId() > last.getLedgerId()) {
return false;
} else if (currentLedger != null && position.getLedgerId() == currentLedger.getId()) {
// If current ledger is empty, the largest read position can be "{current_ledger: 0}".
// Else, the read position can be set to "{LAC + 1}" when subscribe at LATEST,
return (position.getLedgerId() == lac.getLedgerId() && position.getEntryId() <= lac.getEntryId() + 1)
|| position.getEntryId() == 0;
} else if (position.getLedgerId() == lac.getLedgerId()) {
// The ledger witch maintains LAC was closed, and there is an empty current ledger.
// If entry id is larger than LAC, it should be "{current_ledger: 0}".
return position.getEntryId() <= lac.getEntryId();
} else if (position.getLedgerId() == last.getLedgerId()) {
return position.getEntryId() <= (last.getEntryId() + 1);
} else {
// Look in the ledgers map
LedgerInfo ls = ledgers.get(position.getLedgerId());

if (ls == null) {
if (position.getLedgerId() < lac.getLedgerId()) {
if (position.getLedgerId() < last.getLedgerId()) {
// Pointing to a non-existing ledger that is older than the current ledger is invalid
return false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
private void recoverCursor(PositionImpl mdPosition) {
Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
this.readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition);
markDeletePosition = ledger.getPreviousPosition(this.readPosition);
markDeletePosition = mdPosition;

// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is equal to the current backlog (negated).
Expand Down
Loading

0 comments on commit 99e6674

Please sign in to comment.