Skip to content

Commit

Permalink
Fixed race condition in cursor.asyncDelete() (#287)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Mar 11, 2017
1 parent 6d4bd9d commit d1cb591
Showing 1 changed file with 54 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1173,58 +1173,54 @@ void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
* @return the previous acknowledged position
*/
PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
lock.writeLock().lock();
try {
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
}

if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
// If the position that is mark-deleted is past the read position, it
// means that the client has skipped some entries. We need to move
// read position forward
PositionImpl oldReadPosition = readPosition;
readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
}

if (log.isDebugEnabled()) {
log.debug("Moved read position from: {} to: {}", oldReadPosition, readPosition);
}
if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
// If the position that is mark-deleted is past the read position, it
// means that the client has skipped some entries. We need to move
// read position forward
PositionImpl oldReadPosition = readPosition;
readPosition = ledger.getNextValidPosition(newMarkDeletePosition);

oldReadPosition.recycle();
if (log.isDebugEnabled()) {
log.debug("Moved read position from: {} to: {}", oldReadPosition, readPosition);
}

PositionImpl oldMarkDeletePosition = markDeletePosition;
oldReadPosition.recycle();
}

if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) {
long skippedEntries = 0;
if (newMarkDeletePosition.getLedgerId() == oldMarkDeletePosition.getLedgerId()
&& newMarkDeletePosition.getEntryId() == oldMarkDeletePosition.getEntryId() + 1) {
// Mark-deleting the position next to current one
skippedEntries = individualDeletedMessages.contains(newMarkDeletePosition) ? 0 : 1;
} else {
skippedEntries = getNumberOfEntries(Range.openClosed(oldMarkDeletePosition, newMarkDeletePosition));
}
PositionImpl positionAfterNewMarkDelete = ledger.getNextValidPosition(newMarkDeletePosition);
if (individualDeletedMessages.contains(positionAfterNewMarkDelete)) {
Range<PositionImpl> rangeToBeMarkDeleted = individualDeletedMessages
.rangeContaining(positionAfterNewMarkDelete);
newMarkDeletePosition = rangeToBeMarkDeleted.upperEndpoint();
}
PositionImpl oldMarkDeletePosition = markDeletePosition;

if (log.isDebugEnabled()) {
log.debug("Moved ack position from: {} to: {} -- skipped: {}", oldMarkDeletePosition,
newMarkDeletePosition, skippedEntries);
}
messagesConsumedCounter += skippedEntries;
if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) {
long skippedEntries = 0;
if (newMarkDeletePosition.getLedgerId() == oldMarkDeletePosition.getLedgerId()
&& newMarkDeletePosition.getEntryId() == oldMarkDeletePosition.getEntryId() + 1) {
// Mark-deleting the position next to current one
skippedEntries = individualDeletedMessages.contains(newMarkDeletePosition) ? 0 : 1;
} else {
skippedEntries = getNumberOfEntries(Range.openClosed(oldMarkDeletePosition, newMarkDeletePosition));
}
PositionImpl positionAfterNewMarkDelete = ledger.getNextValidPosition(newMarkDeletePosition);
if (individualDeletedMessages.contains(positionAfterNewMarkDelete)) {
Range<PositionImpl> rangeToBeMarkDeleted = individualDeletedMessages
.rangeContaining(positionAfterNewMarkDelete);
newMarkDeletePosition = rangeToBeMarkDeleted.upperEndpoint();
}

// markDelete-position and clear out deletedMsgSet
markDeletePosition = PositionImpl.get(newMarkDeletePosition);
individualDeletedMessages.remove(Range.atMost(markDeletePosition));
oldMarkDeletePosition.recycle();
} finally {
lock.writeLock().unlock();
if (log.isDebugEnabled()) {
log.debug("Moved ack position from: {} to: {} -- skipped: {}", oldMarkDeletePosition,
newMarkDeletePosition, skippedEntries);
}
messagesConsumedCounter += skippedEntries;
}

// markDelete-position and clear out deletedMsgSet
markDeletePosition = PositionImpl.get(newMarkDeletePosition);
individualDeletedMessages.remove(Range.atMost(markDeletePosition));
oldMarkDeletePosition.recycle();

return newMarkDeletePosition;
}

Expand Down Expand Up @@ -1254,11 +1250,14 @@ public void asyncMarkDelete(final Position position, final MarkDeleteCallback ca
}
PositionImpl newPosition = (PositionImpl) position;

lock.writeLock().lock();
try {
newPosition = setAcknowledgedPosition(newPosition);
} catch (IllegalArgumentException e) {
callback.markDeleteFailed(new ManagedLedgerException(e), ctx);
return;
} finally {
lock.writeLock().unlock();
}

// Apply rate limiting to mark-delete operations
Expand Down Expand Up @@ -1487,6 +1486,12 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba
newMarkDeletePosition = range.upperEndpoint();
}
}

if (newMarkDeletePosition != null) {
newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
} else {
newMarkDeletePosition = markDeletePosition;
}
} catch (Exception e) {
log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name,
e.getMessage(), e);
Expand All @@ -1496,19 +1501,13 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba
lock.writeLock().unlock();
}

try {
if (newMarkDeletePosition != null) {
newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
} else {
newMarkDeletePosition = markDeletePosition;
}

// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
callback.deleteComplete(ctx);
return;
}
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
callback.deleteComplete(ctx);
return;
}

try {
internalAsyncMarkDelete(newMarkDeletePosition, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
Expand Down

0 comments on commit d1cb591

Please sign in to comment.