Skip to content

Commit

Permalink
Fix for cursor reset after topic reload (#315)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Mar 24, 2017
1 parent df0bf9e commit 80986d7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

// Read the last entry in the ledger
cursorLedger = lh;
lh.asyncReadLastEntry((rc1, lh1, seq, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("readComplete rc={} entryId={}", rc1, lh1.getLastAddConfirmed());
Expand Down Expand Up @@ -762,24 +761,17 @@ public void operationFailed(ManagedLedgerException exception) {

};

if (cursorLedger == null) {
persistPositionMetaStore(-1, newPosition, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] Updated cursor {} with ledger id {} md-position={} rd-position={}",
ledger.getName(), name, -1, markDeletePosition, readPosition);
cursorLedgerStat = stat;
finalCallback.operationComplete();
}
internalAsyncMarkDelete(newPosition, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
finalCallback.operationComplete();
}

@Override
public void operationFailed(MetaStoreException e) {
finalCallback.operationFailed(e);
}
});
} else {
persistPosition(cursorLedger, newPosition, finalCallback);
}
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
finalCallback.operationFailed(exception);
}
}, null);
}

@Override
Expand Down Expand Up @@ -1817,21 +1809,10 @@ public void operationFailed(ManagedLedgerException exception) {

private void flushPendingMarkDeletes() {
if (!pendingMarkDeleteOps.isEmpty()) {
if (RESET_CURSOR_IN_PROGRESS_UPDATER.get(this) == TRUE) {
failPendingMarkDeletes();
} else {
internalFlushPendingMarkDeletes();
}
internalFlushPendingMarkDeletes();
}
}

private void failPendingMarkDeletes() {
for (PendingMarkDeleteEntry e : pendingMarkDeleteOps) {
e.callback.markDeleteFailed(new ManagedLedgerException("reset cursor in progress"), e.ctx);
}
pendingMarkDeleteOps.clear();
}

void internalFlushPendingMarkDeletes() {
PendingMarkDeleteEntry lastEntry = pendingMarkDeleteOps.getLast();
lastEntry.callbackGroup = Lists.newArrayList(pendingMarkDeleteOps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,4 +426,36 @@ public void testOfflineTopicBacklog() throws Exception {
factory.shutdown();
assertNotNull(offlineTopicStats);
}

@Test(timeOut = 20000)
void testResetCursorAfterRecovery() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc);
ManagedLedgerConfig conf = new ManagedLedgerConfig().setMaxEntriesPerLedger(10).setEnsembleSize(1)
.setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger", conf);
ManagedCursor cursor = ledger.openCursor("trc1");
Position p1 = ledger.addEntry("dummy-entry-1".getBytes());
Position p2 = ledger.addEntry("dummy-entry-2".getBytes());
Position p3 = ledger.addEntry("dummy-entry-3".getBytes());
Position p4 = ledger.addEntry("dummy-entry-4".getBytes());

cursor.markDelete(p3);

ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, zkc);
ledger = factory2.open("my_test_move_cursor_ledger", conf);
cursor = ledger.openCursor("trc1");

assertEquals(cursor.getMarkDeletedPosition(), p3);
assertEquals(cursor.getReadPosition(), p4);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);

cursor.resetCursor(p2);
assertEquals(cursor.getMarkDeletedPosition(), p1);
assertEquals(cursor.getReadPosition(), p2);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 3);

factory2.shutdown();
factory.shutdown();
}
}

0 comments on commit 80986d7

Please sign in to comment.