Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for cursor reset after topic reload #315

Merged
merged 1 commit into from
Mar 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

// Read the last entry in the ledger
cursorLedger = lh;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, if we don't initialize cursorLedger at the time recovery then it will be null initially and at that time internal-stat will not return correct value of cursorLedgerLastEntry. So, is there any specific reason to not initialize it at recovery time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very good point. I removed it because it is confusing to use the same variable to store 2 different kinds of ledger handler (one read-write and the other read-only)

Anyway I think it still make sense for the internal stats. In this case we would see:

cursorLedger = -1
cursorLedgerLastEntry = -1

and that is appropriate since we are in NoLedger state.

This below is the code that returns the -1:

  public long getCursorLedger() {
        LedgerHandle lh = cursorLedger;
        return lh != null ? lh.getId() : -1;
    }

    public long getCursorLedgerLastEntry() {
        LedgerHandle lh = cursorLedger;
        return lh != null ? lh.getLastAddConfirmed() : -1;
    }

lh.asyncReadLastEntry((rc1, lh1, seq, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("readComplete rc={} entryId={}", rc1, lh1.getLastAddConfirmed());
Expand Down Expand Up @@ -762,24 +761,17 @@ public void operationFailed(ManagedLedgerException exception) {

};

if (cursorLedger == null) {
persistPositionMetaStore(-1, newPosition, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] Updated cursor {} with ledger id {} md-position={} rd-position={}",
ledger.getName(), name, -1, markDeletePosition, readPosition);
cursorLedgerStat = stat;
finalCallback.operationComplete();
}
internalAsyncMarkDelete(newPosition, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
finalCallback.operationComplete();
}

@Override
public void operationFailed(MetaStoreException e) {
finalCallback.operationFailed(e);
}
});
} else {
persistPosition(cursorLedger, newPosition, finalCallback);
}
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
finalCallback.operationFailed(exception);
}
}, null);
}

@Override
Expand Down Expand Up @@ -1817,21 +1809,10 @@ public void operationFailed(ManagedLedgerException exception) {

private void flushPendingMarkDeletes() {
if (!pendingMarkDeleteOps.isEmpty()) {
if (RESET_CURSOR_IN_PROGRESS_UPDATER.get(this) == TRUE) {
failPendingMarkDeletes();
} else {
internalFlushPendingMarkDeletes();
}
internalFlushPendingMarkDeletes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internalFlushPendingMarkDeletes just clears previous pendingMarkDeleteOps and not completes the callback of PendingMarkDeleteEntry and there could be possibility if someone is waiting for the callback. One example PersistentReplicator waiting to just do debug-log and which is fine but just concern is that as it has callback so, one can wait on it..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, that is covered 😉

Take a look at internalMarkDelete(), there's this code there:

// Trigger the final callback after having (eventually) triggered the switchin-ledger operation. This
                // will ensure that no race condition will happen between the next mark-delete and the switching
                // operation.
                if (mdEntry.callbackGroup != null) {
                    // Trigger the callback for every request in the group
                    for (PendingMarkDeleteEntry e : mdEntry.callbackGroup) {
                        e.callback.markDeleteComplete(e.ctx);
                    }
                } else {
                    // Only trigger the callback for the current request
                    mdEntry.callback.markDeleteComplete(mdEntry.ctx);
                }

So, 1 single write and triggers all the callbacks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

}
}

private void failPendingMarkDeletes() {
for (PendingMarkDeleteEntry e : pendingMarkDeleteOps) {
e.callback.markDeleteFailed(new ManagedLedgerException("reset cursor in progress"), e.ctx);
}
pendingMarkDeleteOps.clear();
}

void internalFlushPendingMarkDeletes() {
PendingMarkDeleteEntry lastEntry = pendingMarkDeleteOps.getLast();
lastEntry.callbackGroup = Lists.newArrayList(pendingMarkDeleteOps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,4 +426,36 @@ public void testOfflineTopicBacklog() throws Exception {
factory.shutdown();
assertNotNull(offlineTopicStats);
}

@Test(timeOut = 20000)
void testResetCursorAfterRecovery() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc);
ManagedLedgerConfig conf = new ManagedLedgerConfig().setMaxEntriesPerLedger(10).setEnsembleSize(1)
.setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("my_test_move_cursor_ledger", conf);
ManagedCursor cursor = ledger.openCursor("trc1");
Position p1 = ledger.addEntry("dummy-entry-1".getBytes());
Position p2 = ledger.addEntry("dummy-entry-2".getBytes());
Position p3 = ledger.addEntry("dummy-entry-3".getBytes());
Position p4 = ledger.addEntry("dummy-entry-4".getBytes());

cursor.markDelete(p3);

ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, zkc);
ledger = factory2.open("my_test_move_cursor_ledger", conf);
cursor = ledger.openCursor("trc1");

assertEquals(cursor.getMarkDeletedPosition(), p3);
assertEquals(cursor.getReadPosition(), p4);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);

cursor.resetCursor(p2);
assertEquals(cursor.getMarkDeletedPosition(), p1);
assertEquals(cursor.getReadPosition(), p2);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 3);

factory2.shutdown();
factory.shutdown();
}
}